summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/xact.c20
-rw-r--r--src/backend/executor/execTuples.c10
-rw-r--r--src/backend/nodes/copyfuncs.c3
-rw-r--r--src/backend/pgxc/plan/planner.c596
-rw-r--r--src/backend/pgxc/pool/execRemote.c963
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c361
-rw-r--r--src/backend/utils/cache/lsyscache.c2
-rw-r--r--src/backend/utils/sort/tuplesort.c100
-rw-r--r--src/include/executor/tuptable.h2
-rw-r--r--src/include/pgxc/execRemote.h42
-rw-r--r--src/include/pgxc/pgxcnode.h23
-rw-r--r--src/include/pgxc/planner.h1
-rw-r--r--src/include/utils/lsyscache.h3
13 files changed, 1642 insertions, 484 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 262fd3b3d4..2030a0079a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2280,6 +2280,21 @@ AbortTransaction(void)
*/
SetUserIdAndContext(s->prevUser, s->prevSecDefCxt);
+#ifdef PGXC
+ /*
+ * We should rollback on the data nodes before cleaning up portals
+ * to be sure data structures used by connections are not freed yet
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ /*
+ * Make sure this is rolled back on the DataNodes
+ * if so it will just return
+ */
+ PGXCNodeRollback();
+ }
+#endif
+
/*
* do abort processing
*/
@@ -2300,11 +2315,6 @@ AbortTransaction(void)
/* This is done by remote Coordinator */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /*
- * Make sure this is rolled back on the DataNodes
- * if so it will just return
- */
- PGXCNodeRollback();
RollbackTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index a6c496eee9..0c43e9479c 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -153,6 +153,7 @@ ExecCreateTupleTable(int tableSize)
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
slot->tts_attinmeta = NULL;
#endif
slot->tts_mcxt = CurrentMemoryContext;
@@ -238,6 +239,7 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc)
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
slot->tts_attinmeta = NULL;
#endif
slot->tts_mcxt = CurrentMemoryContext;
@@ -440,6 +442,7 @@ ExecStoreTuple(HeapTuple tuple,
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
/*
@@ -509,6 +512,7 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
/*
@@ -547,7 +551,8 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
* --------------------------------
*/
TupleTableSlot *
-ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree)
+ExecStoreDataRowTuple(char *msg, size_t len, int node, TupleTableSlot *slot,
+ bool shouldFree)
{
/*
* sanity checks
@@ -586,6 +591,7 @@ ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFr
slot->tts_mintuple = NULL;
slot->tts_dataRow = msg;
slot->tts_dataLen = len;
+ slot->tts_dataNode = node;
/* Mark extracted state invalid */
slot->tts_nvalid = 0;
@@ -624,6 +630,7 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
slot->tts_tuple = NULL;
@@ -976,6 +983,7 @@ ExecMaterializeSlot(TupleTableSlot *slot)
{
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
}
#endif
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 78c5543dab..aa0587f9c7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -839,6 +839,7 @@ _copyRemoteQuery(RemoteQuery *from)
COPY_NODE_FIELD(distinct);
COPY_SCALAR_FIELD(read_only);
COPY_SCALAR_FIELD(force_autocommit);
+ COPY_STRING_FIELD(cursor);
COPY_STRING_FIELD(relname);
COPY_SCALAR_FIELD(remotejoin);
@@ -888,7 +889,7 @@ _copySimpleAgg(SimpleAgg *from)
COPY_SCALAR_FIELD(transfn);
COPY_SCALAR_FIELD(finalfn);
if (!from->initValueIsNull)
- newnode->initValue = datumCopy(from->initValue, from->transtypeByVal,
+ newnode->initValue = datumCopy(from->initValue, from->transtypeByVal,
from->transtypeLen);
COPY_SCALAR_FIELD(initValueIsNull);
COPY_SCALAR_FIELD(inputtypeLen);
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 5ea6d8a0a5..af368e424a 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -31,6 +31,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
+#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/planner.h"
#include "tcop/pquery.h"
@@ -38,6 +39,7 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/portal.h"
#include "utils/syscache.h"
@@ -123,7 +125,7 @@ typedef struct XCWalkerContext
{
Query *query;
bool isRead;
- ExecNodes *exec_nodes; /* resulting execution nodes */
+ RemoteQuery *query_step; /* remote query step being analized */
Special_Conditions *conditions;
bool multilevel_join;
List *rtables; /* a pointer to a list of rtables */
@@ -141,13 +143,44 @@ bool StrictStatementChecking = true;
/* Forbid multi-node SELECT statements with an ORDER BY clause */
bool StrictSelectChecking = false;
-static ExecNodes *get_plan_nodes(Query *query, bool isRead);
+static void get_plan_nodes(Query *query, RemoteQuery *step, bool isRead);
static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context);
static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context);
static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt);
static void InitXCWalkerContext(XCWalkerContext *context);
/*
+ * Find position of specified substring in the string
+ * All non-printable symbols of str treated as spaces, all letters as uppercase
+ * Returns pointer to the beginning of the substring or NULL
+ */
+static char *
+strpos(char *str, char *substr)
+{
+ char copy[strlen(str) + 1];
+ char *src = str;
+ char *dst = copy;
+
+ /*
+ * Initialize mutable copy, converting letters to uppercase and
+ * various witespace characters to spaces
+ */
+ while (*src)
+ {
+ if (isspace(*src))
+ {
+ src++;
+ *dst++ = ' ';
+ }
+ else
+ *dst++ = toupper(*src++);
+ }
+ *dst = '\0';
+ dst = strstr(copy, substr);
+ return dst ? str + (dst - copy) : NULL;
+}
+
+/*
* True if both lists contain only one node and are the same
*/
static bool
@@ -509,7 +542,7 @@ get_plan_nodes_insert(Query *query)
* Get list of parent-child joins (partitioned together)
* Get list of joins with replicated tables
*
- * If we encounter an expression such as a cross-node join that cannot
+ * If we encounter an expression such as a cross-node join that cannot
* be easily handled in a single step, we stop processing and return true,
* otherwise false.
*
@@ -536,6 +569,242 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
if (!context->conditions)
context->conditions = new_special_conditions();
+ /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */
+ if (IsA(expr_node, CurrentOfExpr))
+ {
+ /* Find referenced portal and figure out what was the last fetch node */
+ Portal portal;
+ QueryDesc *queryDesc;
+ PlanState *state;
+ CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node;
+ char *cursor_name = cexpr->cursor_name;
+ char *node_cursor;
+
+ /* Find the cursor's portal */
+ portal = GetPortalByName(cursor_name);
+ if (!PortalIsValid(portal))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_CURSOR),
+ errmsg("cursor \"%s\" does not exist", cursor_name)));
+
+ queryDesc = PortalGetQueryDesc(portal);
+ if (queryDesc == NULL || queryDesc->estate == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is held from a previous transaction",
+ cursor_name)));
+
+ /*
+ * The cursor must have a current result row: per the SQL spec, it's
+ * an error if not.
+ */
+ if (portal->atStart || portal->atEnd)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not positioned on a row",
+ cursor_name)));
+
+ state = ExecGetActivePlanTree(queryDesc);
+ if (IsA(state, RemoteQueryState))
+ {
+ RemoteQueryState *node = (RemoteQueryState *) state;
+ RemoteQuery *step = (RemoteQuery *) state->plan;
+
+ /*
+ * 1. step query: SELECT * FROM <table> WHERE ctid = <cur_ctid>,
+ * <cur_ctid> is taken from the scantuple ot the target step
+ * step node list: current node of the target step.
+ * 2. step query: DECLARE <xxx> CURSOR FOR SELECT * FROM <table>
+ * WHERE <col1> = <val1> AND <col2> = <val2> ... FOR UPDATE
+ * <xxx> is generated from cursor name of the target step,
+ * <col> and <val> pairs are taken from the step 1.
+ * step node list: all nodes of <table>
+ * 3. step query: MOVE <xxx>
+ * step node list: all nodes of <table>
+ */
+ RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable);
+ node_cursor = step->cursor;
+ rel_loc_info1 = GetRelationLocInfo(table->relid);
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
+ context->query_step->exec_nodes->baselocatortype = rel_loc_info1->locatorType;
+ if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED)
+ {
+ RemoteQuery *step1, *step2, *step3;
+ /*
+ * We do not need first three steps if cursor already exists and
+ * positioned.
+ */
+ if (node->update_cursor)
+ {
+ step3 = NULL;
+ node_cursor = node->update_cursor;
+ }
+ else
+ {
+ char *tableName = get_rel_name(table->relid);
+ int natts = get_relnatts(table->relid);
+ char *attnames[natts];
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ /*
+ * ctid is the last attribute, but more correct to iterate over
+ * attributes and find by name, or store index for table
+ */
+ Datum ctid = slot->tts_values[slot->tts_tupleDescriptor->natts - 1];
+ char *ctid_str = (char *) DirectFunctionCall1(tidout, ctid);
+ int nodenum = slot->tts_dataNode;
+ AttrNumber att;
+ StringInfoData buf;
+ HeapTuple tp;
+ int i;
+ MemoryContext context_save;
+
+ initStringInfo(&buf);
+
+ /* Step 1: select tuple values by ctid */
+ step1 = makeNode(RemoteQuery);
+ appendStringInfoString(&buf, "SELECT ");
+ for (att = 1; att <= natts; att++)
+ {
+ TargetEntry *tle;
+ Var *expr;
+
+ tp = SearchSysCache(ATTNUM,
+ ObjectIdGetDatum(table->relid),
+ Int16GetDatum(att),
+ 0, 0);
+ if (HeapTupleIsValid(tp))
+ {
+ Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
+
+ /* add comma before all except first attributes */
+ if (att > 1)
+ appendStringInfoString(&buf, ", ");
+ attnames[att-1] = pstrdup(NameStr(att_tup->attname));
+ appendStringInfoString(&buf, attnames[att - 1]);
+ expr = makeVar(att, att, att_tup->atttypid,
+ att_tup->atttypmod, 0);
+ tle = makeTargetEntry((Expr *) expr, att,
+ attnames[att - 1], false);
+ step1->scan.plan.targetlist = lappend(step1->scan.plan.targetlist, tle);
+ ReleaseSysCache(tp);
+ }
+ else
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ att, table->relid);
+ }
+ appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'",
+ tableName, ctid_str);
+ step1->sql_statement = pstrdup(buf.data);
+ step1->is_single_step = true;
+ step1->exec_nodes = makeNode(ExecNodes);
+ step1->read_only = true;
+ step1->exec_nodes->nodelist = list_make1_int(nodenum);
+
+ /* Step 2: declare cursor for update target table */
+ step2 = makeNode(RemoteQuery);
+ resetStringInfo(&buf);
+
+ appendStringInfoString(&buf, step->cursor);
+ appendStringInfoString(&buf, "upd");
+ /* This need to survive while the target Portal is alive */
+ context_save = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+ node_cursor = pstrdup(buf.data);
+ node->update_cursor = node_cursor;
+ MemoryContextSwitchTo(context_save);
+ resetStringInfo(&buf);
+
+ appendStringInfo(&buf,
+ "DECLARE %s CURSOR FOR SELECT * FROM %s WHERE ",
+ node_cursor, tableName);
+ for (i = 0; i < natts; i++)
+ {
+ /* add comma before all except first attributes */
+ if (i)
+ appendStringInfoString(&buf, "AND ");
+ appendStringInfo(&buf, "%s = $%d ", attnames[i], i+1);
+ }
+ appendStringInfoString(&buf, "FOR UPDATE");
+ step2->sql_statement = pstrdup(buf.data);
+ step2->is_single_step = true;
+ step2->read_only = true;
+ step2->exec_nodes = makeNode(ExecNodes);
+ step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ innerPlan(step2) = (Plan *) step1;
+ /* Step 3: move cursor to first position */
+ step3 = makeNode(RemoteQuery);
+ resetStringInfo(&buf);
+ appendStringInfo(&buf, "MOVE %s", node_cursor);
+ step3->sql_statement = pstrdup(buf.data);
+ step3->is_single_step = true;
+ step3->read_only = true;
+ step3->exec_nodes = makeNode(ExecNodes);
+ step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ innerPlan(step3) = (Plan *) step2;
+
+ innerPlan(context->query_step) = (Plan *) step3;
+
+ pfree(buf.data);
+ }
+ context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ }
+ else
+ {
+ /* Take target node from last scan tuple of referenced step */
+ int curr_node = node->ss.ss_ScanTupleSlot->tts_dataNode;
+ context->query_step->exec_nodes->nodelist = lappend_int(context->query_step->exec_nodes->nodelist, curr_node);
+ }
+ FreeRelationLocInfo(rel_loc_info1);
+
+ context->query_step->is_single_step = true;
+ /*
+ * replace cursor name in the query if differs
+ */
+ if (strcmp(cursor_name, node_cursor))
+ {
+ StringInfoData buf;
+ char *str = context->query->sql_statement;
+ /*
+ * Find last occurence of cursor_name
+ */
+ for (;;)
+ {
+ char *next = strstr(str + 1, cursor_name);
+ if (next)
+ str = next;
+ else
+ break;
+ }
+
+ /*
+ * now str points to cursor name truncate string here
+ * do not care the string is modified - we will pfree it
+ * soon anyway
+ */
+ *str = '\0';
+
+ /* and move str at the beginning of the reminder */
+ str += strlen(cursor_name);
+
+ /* build up new statement */
+ initStringInfo(&buf);
+ appendStringInfoString(&buf, context->query->sql_statement);
+ appendStringInfoString(&buf, node_cursor);
+ appendStringInfoString(&buf, str);
+
+ /* take the result */
+ pfree(context->query->sql_statement);
+ context->query->sql_statement = buf.data;
+ }
+ return false;
+ }
+ // ??? current plan node is not a remote query
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
+ return false;
+ }
+
if (IsA(expr_node, Var))
{
/* If we get here, that meant the previous call before recursing down did not
@@ -800,13 +1069,13 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
bool is_multilevel;
int save_parent_child_count = 0;
SubLink *sublink = (SubLink *) expr_node;
- ExecNodes *save_exec_nodes = context->exec_nodes; /* Save old exec_nodes */
+ ExecNodes *save_exec_nodes = context->query_step->exec_nodes; /* Save old exec_nodes */
/* save parent-child count */
- if (context->exec_nodes)
+ if (context->query_step->exec_nodes)
save_parent_child_count = list_length(context->conditions->partitioned_parent_child);
- context->exec_nodes = NULL;
+ context->query_step->exec_nodes = NULL;
context->multilevel_join = false;
current_rtable = ((Query *) sublink->subselect)->rtable;
@@ -823,31 +1092,31 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
context->multilevel_join = false;
/* Allow for replicated tables */
- if (!context->exec_nodes)
- context->exec_nodes = save_exec_nodes;
+ if (!context->query_step->exec_nodes)
+ context->query_step->exec_nodes = save_exec_nodes;
else
{
if (save_exec_nodes)
{
- if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
+ if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
{
- context->exec_nodes = save_exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
}
else
{
if (save_exec_nodes->tableusagetype != TABLE_USAGE_TYPE_USER_REPLICATED)
{
/* See if they run on the same node */
- if (same_single_node (context->exec_nodes->nodelist, save_exec_nodes->nodelist))
+ if (same_single_node (context->query_step->exec_nodes->nodelist, save_exec_nodes->nodelist))
return false;
}
else
/* use old value */
- context->exec_nodes = save_exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
}
} else
{
- if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
+ if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
return false;
/* See if subquery safely joins with parent */
if (!is_multilevel)
@@ -986,8 +1255,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (contains_only_pg_catalog (query->rtable))
{
/* just pg_catalog tables */
- context->exec_nodes = makeNode(ExecNodes);
- context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
context->exec_on_coord = true;
return false;
}
@@ -1005,7 +1274,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (rte->rtekind == RTE_SUBQUERY)
{
- ExecNodes *save_exec_nodes = context->exec_nodes;
+ ExecNodes *save_exec_nodes = context->query_step->exec_nodes;
Special_Conditions *save_conditions = context->conditions; /* Save old conditions */
List *current_rtable = rte->subquery->rtable;
@@ -1025,8 +1294,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
context->rtables = list_delete_ptr(context->rtables, current_rtable);
context->conditions = save_conditions;
- current_nodes = context->exec_nodes;
- context->exec_nodes = save_exec_nodes;
+ current_nodes = context->query_step->exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
if (current_nodes)
current_usage_type = current_nodes->tableusagetype;
@@ -1103,8 +1372,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* If we are just dealing with pg_catalog, just return */
if (table_usage_type == TABLE_USAGE_TYPE_PGCATALOG)
{
- context->exec_nodes = makeNode(ExecNodes);
- context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
context->exec_on_coord = true;
return false;
}
@@ -1113,6 +1382,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (examine_conditions_walker(query->jointree->quals, context))
return true;
+ if (context->query_step->exec_nodes)
+ return false;
+
/* Examine join conditions, see if each join is single-node safe */
if (context->join_list != NULL)
{
@@ -1174,27 +1446,27 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (!rel_loc_info)
return true;
- context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
+ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
}
}
else
{
- context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
+ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
}
/* Note replicated table usage for determining safe queries */
- if (context->exec_nodes)
+ if (context->query_step->exec_nodes)
{
if (table_usage_type == TABLE_USAGE_TYPE_USER && IsReplicated(rel_loc_info))
table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED;
- context->exec_nodes->tableusagetype = table_usage_type;
+ context->query_step->exec_nodes->tableusagetype = table_usage_type;
}
}
/* check for partitioned col comparison against a literal */
else if (list_length(context->conditions->partitioned_literal_comps) > 0)
{
- context->exec_nodes = NULL;
+ context->query_step->exec_nodes = NULL;
/*
* Make sure that if there are multiple such comparisons, that they
@@ -1208,11 +1480,11 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
lit_comp->rel_loc_info, &(lit_comp->constant), true);
test_exec_nodes->tableusagetype = table_usage_type;
- if (context->exec_nodes == NULL)
- context->exec_nodes = test_exec_nodes;
+ if (context->query_step->exec_nodes == NULL)
+ context->query_step->exec_nodes = test_exec_nodes;
else
{
- if (!same_single_node(context->exec_nodes->nodelist, test_exec_nodes->nodelist))
+ if (!same_single_node(context->query_step->exec_nodes->nodelist, test_exec_nodes->nodelist))
{
return true;
}
@@ -1231,22 +1503,22 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
parent_child = (Parent_Child_Join *)
linitial(context->conditions->partitioned_parent_child);
- context->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead);
- context->exec_nodes->tableusagetype = table_usage_type;
+ context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead);
+ context->query_step->exec_nodes->tableusagetype = table_usage_type;
}
if (from_query_nodes)
{
- if (!context->exec_nodes)
+ if (!context->query_step->exec_nodes)
{
- context->exec_nodes = from_query_nodes;
+ context->query_step->exec_nodes = from_query_nodes;
return false;
}
/* Just use exec_nodes if the from subqueries are all replicated or using the exact
* same node
*/
else if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED
- || (same_single_node(from_query_nodes->nodelist, context->exec_nodes->nodelist)))
+ || (same_single_node(from_query_nodes->nodelist, context->query_step->exec_nodes->nodelist)))
return false;
else
{
@@ -1254,7 +1526,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
* but the parent query applies a condition on the from subquery.
*/
if (list_length(query->jointree->fromlist) == from_subquery_count
- && list_length(context->exec_nodes->nodelist) == 1)
+ && list_length(context->query_step->exec_nodes->nodelist) == 1)
return false;
}
/* Too complicated, give up */
@@ -1270,8 +1542,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
static void
InitXCWalkerContext(XCWalkerContext *context)
{
+ context->query = NULL;
context->isRead = true;
- context->exec_nodes = NULL;
+ context->query_step = NULL;
context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions));
context->rtables = NIL;
context->multilevel_join = false;
@@ -1286,27 +1559,26 @@ InitXCWalkerContext(XCWalkerContext *context)
* Top level entry point before walking query to determine plan nodes
*
*/
-static ExecNodes *
-get_plan_nodes(Query *query, bool isRead)
+static void
+get_plan_nodes(Query *query, RemoteQuery *step, bool isRead)
{
- ExecNodes *result_nodes = NULL;
XCWalkerContext context;
InitXCWalkerContext(&context);
+ context.query = query;
context.isRead = isRead;
+ context.query_step = step;
context.rtables = lappend(context.rtables, query->rtable);
- if (!get_plan_nodes_walker((Node *) query, &context))
- result_nodes = context.exec_nodes;
- if (context.exec_on_coord && result_nodes)
+ if ((get_plan_nodes_walker((Node *) query, &context)
+ || context.exec_on_coord) && context.query_step->exec_nodes)
{
- pfree(result_nodes);
- result_nodes = NULL;
+ pfree(context.query_step->exec_nodes);
+ context.query_step->exec_nodes = NULL;
}
free_special_relations(context.conditions);
free_join_list(context.join_list);
- return result_nodes;
}
@@ -1315,32 +1587,25 @@ get_plan_nodes(Query *query, bool isRead)
*
* return NULL if it is not safe to be done in a single step.
*/
-static ExecNodes *
-get_plan_nodes_command(Query *query)
+static void
+get_plan_nodes_command(Query *query, RemoteQuery *step)
{
- ExecNodes *exec_nodes = NULL;
-
switch (query->commandType)
{
case CMD_SELECT:
- exec_nodes = get_plan_nodes(query, true);
+ get_plan_nodes(query, step, true);
break;
case CMD_INSERT:
- exec_nodes = get_plan_nodes_insert(query);
+ step->exec_nodes = get_plan_nodes_insert(query);
break;
case CMD_UPDATE:
case CMD_DELETE:
/* treat as a select */
- exec_nodes = get_plan_nodes(query, false);
+ get_plan_nodes(query, step, false);
break;
-
- default:
- return NULL;
}
-
- return exec_nodes;
}
@@ -1645,8 +1910,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
List *sub_tlist = step->scan.plan.targetlist;
ListCell *l;
StringInfo buf = makeStringInfo();
- char *sql;
- char *cur;
char *sql_from;
context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL);
@@ -1677,23 +1940,9 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
* Do not handle the case if " FROM " we found is not a "FROM" keyword, but,
* for example, a part of string constant.
*/
- sql = pstrdup(step->sql_statement); /* mutable copy */
- /* string to upper case, for comparing */
- cur = sql;
- while (*cur)
- {
- /* replace whitespace with a space */
- if (isspace((unsigned char) *cur))
- *cur = ' ';
- *cur++ = toupper(*cur);
- }
-
- /* find the keyword */
- sql_from = strstr(sql, " FROM ");
+ sql_from = strpos(step->sql_statement, " FROM ");
if (sql_from)
{
- /* the same offset in the original string */
- int offset = sql_from - sql;
/*
* Truncate query at the position of terminating semicolon to be able
* to append extra order by entries. If query is submitted from client
@@ -1705,7 +1954,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
if (*end == ';')
*end = '\0';
- appendStringInfoString(buf, step->sql_statement + offset);
+ appendStringInfoString(buf, sql_from);
}
if (extra_sort)
@@ -1728,9 +1977,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
}
}
- /* do not need the copy */
- pfree(sql);
-
/* free previous query */
pfree(step->sql_statement);
/* get a copy of new query */
@@ -1742,6 +1988,81 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
/*
+ * Traverse the plan subtree and set cursor name for RemoteQuery nodes
+ * Cursor names must be unique, so append step_no parameter to the initial
+ * cursor name. Returns next step_no to be assigned
+ */
+static int
+set_cursor_name(Plan *subtree, char *cursor, int step_no)
+{
+ if (innerPlan(subtree))
+ step_no = set_cursor_name(innerPlan(subtree), cursor, step_no);
+ if (outerPlan(subtree))
+ step_no = set_cursor_name(outerPlan(subtree), cursor, step_no);
+ if (IsA(subtree, RemoteQuery))
+ {
+ RemoteQuery *step = (RemoteQuery *) subtree;
+ /*
+ * Keep the name for the very first step, hoping it is the only step and
+ * we do not have to modify WHERE CURRENT OF
+ */
+ if (step_no)
+ {
+ StringInfoData buf;
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "%s%d", cursor, step_no++);
+ /* make a copy before overwriting */
+ step->cursor = buf.data;
+ }
+ else
+ {
+ step_no++;
+ step->cursor = pstrdup(cursor);
+ }
+ }
+ return step_no;
+}
+
+/*
+ * Append ctid to the field list of step queries to support update
+ * WHERE CURRENT OF. The ctid is not sent down to client but used as a key
+ * to find target tuple
+ */
+static void
+fetch_ctid_of(Plan *subtree, RowMarkClause *rmc)
+{
+ /* recursively process subnodes */
+ if (innerPlan(subtree))
+ fetch_ctid_of(innerPlan(subtree), rmc);
+ if (outerPlan(subtree))
+ fetch_ctid_of(outerPlan(subtree), rmc);
+
+ /* we are only interested in RemoteQueries */
+ if (IsA(subtree, RemoteQuery))
+ {
+ RemoteQuery *step = (RemoteQuery *) subtree;
+ /*
+ * TODO Find if the table is referenced by the step query
+ */
+
+ char *from_sql = strpos(step->sql_statement, " FROM ");
+ if (from_sql)
+ {
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, step->sql_statement,
+ (int) (from_sql - step->sql_statement));
+ /* TODO qualify with the table name */
+ appendStringInfoString(&buf, ", ctid");
+ appendStringInfoString(&buf, from_sql);
+ pfree(step->sql_statement);
+ step->sql_statement = buf.data;
+ }
+ }
+}
+
+/*
* Plan to sort step tuples
* PGXC: copied and adopted from optimizer/plan/planner.c
*/
@@ -2114,44 +2435,9 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
query_step = makeNode(RemoteQuery);
query_step->is_single_step = false;
- /*
- * Declare Cursor case:
- * We should leave as a step query only SELECT statement
- * Further if we need refer source statement for planning we should take
- * the truncated string
- */
- if (query->utilityStmt &&
+ if (query->utilityStmt &&
IsA(query->utilityStmt, DeclareCursorStmt))
- {
-
- char *src = query->sql_statement;
- char str[strlen(src) + 1]; /* mutable copy */
- char *dst = str;
-
- cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options;
-
- /*
- * Initialize mutable copy, converting letters to uppercase and
- * various witespace characters to spaces
- */
- while (*src)
- {
- if (isspace(*src))
- {
- src++;
- *dst++ = ' ';
- }
- else
- *dst++ = toupper(*src++);
- }
- *dst = '\0';
- /* search for SELECT keyword in the normalized string */
- dst = strstr(str, " SELECT ");
- /* Take substring of the original string using found offset */
- query_step->sql_statement = pstrdup(query->sql_statement + (dst - str + 1));
- }
- else
- query_step->sql_statement = pstrdup(query->sql_statement);
+ cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options;
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
@@ -2187,7 +2473,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
- query_step->exec_nodes = get_plan_nodes_command(query);
+ get_plan_nodes_command(query, query_step);
if (query_step->exec_nodes == NULL)
{
@@ -2217,26 +2503,29 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
}
/*
- * Use standard plan if we have more than one data node with either
- * group by, hasWindowFuncs, or hasRecursive
+ * get_plan_nodes_command may alter original statement, so do not
+ * process it before the call
+ *
+ * Declare Cursor case:
+ * We should leave as a step query only SELECT statement
+ * Further if we need refer source statement for planning we should take
+ * the truncated string
*/
- /*
- * PGXCTODO - this could be improved to check if the first
- * group by expression is the partitioning column, in which
- * case it is ok to treat as a single step.
- */
- if (query->commandType == CMD_SELECT
- && query_step->exec_nodes
- && list_length(query_step->exec_nodes->nodelist) > 1
- && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
+ if (query->utilityStmt &&
+ IsA(query->utilityStmt, DeclareCursorStmt))
{
- result = standard_planner(query, cursorOptions, boundParams);
- return result;
+
+ /* search for SELECT keyword in the normalized string */
+ char *select = strpos(query->sql_statement, " SELECT ");
+ /* Take substring of the original string using found offset */
+ query_step->sql_statement = pstrdup(select + 1);
}
+ else
+ query_step->sql_statement = pstrdup(query->sql_statement);
/*
- * If there already is an active portal, we may be doing planning
- * within a function. Just use the standard plan, but check if
+ * If there already is an active portal, we may be doing planning
+ * within a function. Just use the standard plan, but check if
* it is part of an EXPLAIN statement so that we do not show that
* we plan multiple steps when it is a single-step operation.
*/
@@ -2285,6 +2574,24 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result = standard_planner(query, cursorOptions, boundParams);
return result;
}
+
+ /*
+ * Use standard plan if we have more than one data node with either
+ * group by, hasWindowFuncs, or hasRecursive
+ */
+ /*
+ * PGXCTODO - this could be improved to check if the first
+ * group by expression is the partitioning column, in which
+ * case it is ok to treat as a single step.
+ */
+ if (query->commandType == CMD_SELECT
+ && query_step->exec_nodes
+ && list_length(query_step->exec_nodes->nodelist) > 1
+ && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
+ {
+ result->planTree = standardPlan;
+ return result;
+ }
break;
default:
@@ -2307,6 +2614,33 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result->planTree = materialize_finished_plan(result->planTree);
}
+ /*
+ * Support for multi-step cursor.
+ * Ensure uniqueness of remote cursor name
+ */
+ if (query->utilityStmt &&
+ IsA(query->utilityStmt, DeclareCursorStmt))
+ {
+ DeclareCursorStmt *stmt = (DeclareCursorStmt *) query->utilityStmt;
+ set_cursor_name(result->planTree, stmt->portalname, 0);
+ }
+
+ /*
+ * If query is FOR UPDATE fetch CTIDs from the remote node
+ * Use CTID as a key to update tuples on remote nodes when handling
+ * WHERE CURRENT OF
+ */
+ if (query->rowMarks)
+ {
+ ListCell *lc;
+ foreach(lc, query->rowMarks)
+ {
+ RowMarkClause *rmc = (RowMarkClause *) lfirst(lc);
+
+ fetch_ctid_of(result->planTree, rmc);
+ }
+ }
+
return result;
}
@@ -2321,6 +2655,8 @@ free_query_step(RemoteQuery *query_step)
return;
pfree(query_step->sql_statement);
+ if (query_step->cursor)
+ pfree(query_step->cursor);
if (query_step->exec_nodes)
{
if (query_step->exec_nodes->nodelist)
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 7fe08beacb..a524c13f6d 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -35,6 +35,8 @@
#define END_QUERY_TIMEOUT 20
#define CLEAR_TIMEOUT 5
+#define DATA_NODE_FETCH_SIZE 1
+
extern char *deparseSql(RemoteQueryState *scanstate);
@@ -70,6 +72,8 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles);
static int handle_response_clear(PGXCNodeHandle * conn);
+static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor);
+
static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void);
#define MAX_STATEMENTS_PER_TRAN 10
@@ -200,8 +204,11 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->msg = NULL;
- combiner->msglen = 0;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = NIL;
+ combiner->tapenodes = NULL;
combiner->initAggregates = true;
combiner->simple_aggregates = NULL;
combiner->copy_file = NULL;
@@ -671,10 +678,10 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
* Caller must stop reading if function returns false
*/
static void
-HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
+HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len, int node)
{
/* We expect previous message is consumed */
- Assert(combiner->msg == NULL);
+ Assert(combiner->currentRow.msg == NULL);
if (combiner->request_type != REQUEST_TYPE_QUERY)
{
@@ -695,9 +702,10 @@ HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
* We are copying message because it points into connection buffer, and
* will be overwritten on next socket read
*/
- combiner->msg = (char *) palloc(len);
- memcpy(combiner->msg, msg_body, len);
- combiner->msglen = len;
+ combiner->currentRow.msg = (char *) palloc(len);
+ memcpy(combiner->currentRow.msg, msg_body, len);
+ combiner->currentRow.msglen = len;
+ combiner->currentRow.msgnode = node;
}
/*
@@ -850,6 +858,10 @@ CloseCombiner(RemoteQueryState *combiner)
FreeTupleDesc(combiner->tuple_desc);
if (combiner->errorMessage)
pfree(combiner->errorMessage);
+ if (combiner->cursor_connections)
+ pfree(combiner->cursor_connections);
+ if (combiner->tapenodes)
+ pfree(combiner->tapenodes);
pfree(combiner);
}
}
@@ -874,15 +886,24 @@ static bool
ValidateAndResetCombiner(RemoteQueryState *combiner)
{
bool valid = validate_combiner(combiner);
+ ListCell *lc;
if (combiner->connections)
pfree(combiner->connections);
if (combiner->tuple_desc)
FreeTupleDesc(combiner->tuple_desc);
- if (combiner->msg)
- pfree(combiner->msg);
+ if (combiner->currentRow.msg)
+ pfree(combiner->currentRow.msg);
+ foreach(lc, combiner->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ pfree(dataRow->msg);
+ }
+ list_free_deep(combiner->rowBuffer);
if (combiner->errorMessage)
pfree(combiner->errorMessage);
+ if (combiner->tapenodes)
+ pfree(combiner->tapenodes);
combiner->command_complete_count = 0;
combiner->connections = NULL;
@@ -894,8 +915,11 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->msg = NULL;
- combiner->msglen = 0;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = NIL;
+ combiner->tapenodes = NULL;
combiner->simple_aggregates = NULL;
combiner->copy_file = NULL;
@@ -903,22 +927,247 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
}
/*
+ * It is possible if multiple steps share the same data node connection, when
+ * executor is running multi-step query or client is running multiple queries
+ * using Extended Query Protocol. After returning next tuple ExecRemoteQuery
+ * function passes execution control to the executor and then it can be given
+ * to the same RemoteQuery or to different one. It is possible that before
+ * returning a tuple the function do not read all data node responses. In this
+ * case pending responses should be read in context of original RemoteQueryState
+ * till ReadyForQuery message and data rows should be stored (buffered) to be
+ * available when fetch from that RemoteQueryState is requested again.
+ * BufferConnection function does the job.
+ * If a RemoteQuery is going to use connection it should check connection state.
+ * DN_CONNECTION_STATE_QUERY indicates query has data to read and combiner
+ * points to the original RemoteQueryState. If combiner differs from "this" the
+ * connection should be buffered.
+ */
+void
+BufferConnection(PGXCNodeHandle *conn)
+{
+ RemoteQueryState *combiner = conn->combiner;
+ /*
+ * When BufferConnection is invoked CurrentContext is related to other
+ * portal, which is trying to control the connection.
+ * TODO See if we can find better context to switch to
+ */
+ MemoryContext oldcontext = MemoryContextSwitchTo(combiner->ss.ss_ScanTupleSlot->tts_mcxt);
+
+ Assert(conn->state == DN_CONNECTION_STATE_QUERY && combiner);
+
+ /* Verify the connection is in use by the combiner */
+ combiner->current_conn = 0;
+ while (combiner->current_conn < combiner->conn_count)
+ {
+ if (combiner->connections[combiner->current_conn] == conn)
+ break;
+ combiner->current_conn++;
+ }
+ Assert(combiner->current_conn < combiner->conn_count);
+
+ /*
+ * Buffer data rows until data node return number of rows specified by the
+ * fetch_size parameter of last Execute message (PortalSuspended message)
+ * or end of result set is reached (CommandComplete message)
+ */
+ while (conn->state == DN_CONNECTION_STATE_QUERY)
+ {
+ int res;
+
+ /* Move to buffer currentRow (received from the data node) */
+ if (combiner->currentRow.msg)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) palloc(sizeof(RemoteDataRowData));
+ *dataRow = combiner->currentRow;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = lappend(combiner->rowBuffer, dataRow);
+ }
+
+ res = handle_response(conn, combiner);
+ /*
+ * If response message is a DataRow it will be handled on the next
+ * iteration.
+ * PortalSuspended will cause connection state change and break the loop
+ * The same is for CommandComplete, but we need additional handling -
+ * remove connection from the list of active connections.
+ * We may need to add handling error response
+ */
+ if (res == RESPONSE_EOF)
+ {
+ /* incomplete message, read more */
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ continue;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ /*
+ * End of result set is reached, so either set the pointer to the
+ * connection to NULL (step with sort) or remove it from the list
+ * (step without sort)
+ */
+ if (combiner->tuplesortstate)
+ {
+ combiner->connections[combiner->current_conn] = NULL;
+ if (combiner->tapenodes == NULL)
+ combiner->tapenodes = (int*) palloc0(NumDataNodes * sizeof(int));
+ combiner->tapenodes[combiner->current_conn] = conn->nodenum;
+ }
+ else
+ /* Remove current connection, move last in-place, adjust current_conn */
+ if (combiner->current_conn < --combiner->conn_count)
+ combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count];
+ else
+ combiner->current_conn = 0;
+ }
+ /*
+ * Before output RESPONSE_COMPLETE or PORTAL_SUSPENDED handle_response()
+ * changes connection state to DN_CONNECTION_STATE_IDLE, breaking the
+ * loop. We do not need to do anything specific in case of
+ * PORTAL_SUSPENDED so skiping "else if" block for that case
+ */
+ }
+ MemoryContextSwitchTo(oldcontext);
+ conn->combiner = NULL;
+}
+
+/*
* Get next data row from the combiner's buffer into provided slot
- * Just clear slot and return false if buffer is empty, that means more data
- * should be read
+ * Just clear slot and return false if buffer is empty, that means end of result
+ * set is reached
*/
bool
FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
{
- /* have messages in the buffer, consume them */
- if (combiner->msg)
+ bool have_tuple = false;
+
+ while (combiner->conn_count > 0)
{
- ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true);
- combiner->msg = NULL;
- combiner->msglen = 0;
+ PGXCNodeHandle *conn;
+ int res;
+
+ /* If we have message in the buffer, consume it */
+ if (combiner->currentRow.msg)
+ {
+ ExecStoreDataRowTuple(combiner->currentRow.msg,
+ combiner->currentRow.msglen,
+ combiner->currentRow.msgnode, slot, true);
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ have_tuple = true;
+ }
+ /*
+ * If this is ordered fetch we can not know what is the node
+ * to handle next, so sorter will choose next itself and set it as
+ * currentRow to have it consumed on the next call to FetchTuple
+ */
+ if (((RemoteQuery *)combiner->ss.ps.plan)->sort)
+ return have_tuple;
+
+ /*
+ * Note: If we are fetching not sorted results we can not have both
+ * currentRow and buffered rows. When connection is buffered currentRow
+ * is moved to buffer, and then it is cleaned after buffering is
+ * completed. Afterwards rows will be taken from the buffer bypassing
+ * currentRow until buffer is empty, and only after that data are read
+ * from a connection.
+ */
+ if (list_length(combiner->rowBuffer) > 0)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) linitial(combiner->rowBuffer);
+ combiner->rowBuffer = list_delete_first(combiner->rowBuffer);
+ ExecStoreDataRowTuple(dataRow->msg, dataRow->msglen,
+ dataRow->msgnode, slot, true);
+ pfree(dataRow);
+ return true;
+ }
+
+ conn = combiner->connections[combiner->current_conn];
+
+ /* Going to use a connection, buffer it if needed */
+ if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != NULL
+ && conn->combiner != combiner)
+ BufferConnection(conn);
+
+ /*
+ * If current connection is idle it means portal on the data node is
+ * suspended. If we have a tuple do not hurry to request more rows,
+ * leave connection clean for other RemoteQueries.
+ * If we do not have, request more and try to get it
+ */
+ if (conn->state == DN_CONNECTION_STATE_IDLE)
+ {
+ /*
+ * If we have tuple to return do not hurry to request more, keep
+ * connection clean
+ */
+ if (have_tuple)
+ return have_tuple;
+ else
+ {
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ conn->combiner = combiner;
+ }
+ }
+
+ /* read messages */
+ res = handle_response(conn, combiner);
+ if (res == RESPONSE_EOF)
+ {
+ /* incomplete message, read more */
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ continue;
+ }
+ else if (res == RESPONSE_SUSPENDED)
+ {
+ /* Make next connection current */
+ if (++combiner->current_conn >= combiner->conn_count)
+ combiner->current_conn = 0;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ /* Remove current connection, move last in-place, adjust current_conn */
+ if (combiner->current_conn < --combiner->conn_count)
+ combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count];
+ else
+ combiner->current_conn = 0;
+ }
+
+ /* If we have a tuple we can leave now. */
+ if (have_tuple)
+ return true;
+ }
+ /* Wrap up last message if exists */
+ if (combiner->currentRow.msg)
+ {
+ ExecStoreDataRowTuple(combiner->currentRow.msg,
+ combiner->currentRow.msglen,
+ combiner->currentRow.msgnode, slot, true);
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
return true;
}
- /* inform caller that buffer is empty */
+ /* otherwise report end of data to the caller */
ExecClearTuple(slot);
return false;
}
@@ -986,10 +1235,11 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections,
* Long term, we should look into cancelling executing statements
* and closing the connections.
* Return values:
- * EOF - need to receive more data for the connection
- * 0 - done with the connection
- * 1 - got data row
- * 2 - got copy response
+ * RESPONSE_EOF - need to receive more data for the connection
+ * RESPONSE_COMPLETE - done with the connection
+ * RESPONSE_TUPLEDESC - got tuple description
+ * RESPONSE_DATAROW - got data row
+ * RESPONSE_COPY - got copy response
*/
int
handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
@@ -997,38 +1247,40 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
char *msg;
int msg_len;
char msg_type;
+ bool suspended = false;
for (;;)
{
- /* No data available, exit */
- if (conn->state == DN_CONNECTION_STATE_QUERY)
- return RESPONSE_EOF;
+ Assert(conn->state != DN_CONNECTION_STATE_IDLE);
+ Assert(conn->combiner == combiner || conn->combiner == NULL);
/*
* If we are in the process of shutting down, we
- * may be rolling back, and the buffer may contain other messages.
- * We want to avoid a procarray exception
- * as well as an error stack overflow.
- */
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
if (proc_exit_inprogress)
- {
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+
+ /* don't read from from the connection if there is a fatal error */
+ if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
+ return RESPONSE_COMPLETE;
+
+ /* No data available, exit */
+ if (!HAS_MESSAGE_BUFFERED(conn))
return RESPONSE_EOF;
- }
/* TODO handle other possible responses */
msg_type = get_message(conn, &msg_len, &msg);
switch (msg_type)
{
case '\0': /* Not enough data in the buffer */
- conn->state = DN_CONNECTION_STATE_QUERY;
return RESPONSE_EOF;
case 'c': /* CopyToCommandComplete */
- conn->state = DN_CONNECTION_STATE_COMPLETED;
HandleCopyOutComplete(combiner);
break;
case 'C': /* CommandComplete */
- conn->state = DN_CONNECTION_STATE_COMPLETED;
HandleCommandComplete(combiner, msg, msg_len);
break;
case 'T': /* RowDescription */
@@ -1043,8 +1295,17 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
#ifdef DN_CONNECTION_DEBUG
Assert(conn->have_row_desc);
#endif
- HandleDataRow(combiner, msg, msg_len);
+ HandleDataRow(combiner, msg, msg_len, conn->nodenum);
return RESPONSE_DATAROW;
+ case 's': /* PortalSuspended */
+ suspended = true;
+ break;
+ case '1': /* ParseComplete */
+ case '2': /* BindComplete */
+ case '3': /* CloseComplete */
+ case 'n': /* NoData */
+ /* simple notifications, continue reading */
+ break;
case 'G': /* CopyInResponse */
conn->state = DN_CONNECTION_STATE_COPY_IN;
HandleCopyIn(combiner);
@@ -1060,7 +1321,6 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
break;
case 'E': /* ErrorResponse */
HandleError(combiner, msg, msg_len);
- conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
/*
* Do not return with an error, we still need to consume Z,
* ready-for-query
@@ -1074,12 +1334,22 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
*/
break;
case 'Z': /* ReadyForQuery */
+ {
+ /*
+ * Return result depends on previous connection state.
+ * If it was PORTAL_SUSPENDED coordinator want to send down
+ * another EXECUTE to fetch more rows, otherwise it is done
+ * with the connection
+ */
+ int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE;
conn->transaction_status = msg[0];
conn->state = DN_CONNECTION_STATE_IDLE;
+ conn->combiner = NULL;
#ifdef DN_CONNECTION_DEBUG
conn->have_row_desc = false;
#endif
- return RESPONSE_COMPLETE;
+ return result;
+ }
case 'I': /* EmptyQuery */
default:
/* sync lost? */
@@ -1092,6 +1362,7 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
return RESPONSE_EOF;
}
+
/*
* Like handle_response, but for consuming the messages,
* in case we of an error to clean the data node connection.
@@ -1138,8 +1409,8 @@ handle_response_clear(PGXCNodeHandle * conn)
case 'N': /* NoticeResponse */
break;
case 'E': /* ErrorResponse */
- conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
/*
+ * conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
* Do not return with an error, we still need to consume Z,
* ready-for-query
*/
@@ -1176,6 +1447,8 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
/* Send BEGIN */
for (i = 0; i < conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
return EOF;
@@ -1372,7 +1645,7 @@ finish:
rollback_xid = BeginTranGTM(NULL);
- /*
+ /*
* Send xid and rollback prepared down to Datanodes and Coordinators
* Even if we get an error on one, we try and send to the others
*/
@@ -1398,7 +1671,7 @@ finish:
/*
- * Commit prepared transaction on Datanodes and Coordinators (as necessary)
+ * Commit prepared transaction on Datanodes and Coordinators (as necessary)
* where it has been prepared.
* Connection to backends has been cut when transaction has been prepared,
* So it is necessary to send the COMMIT PREPARE message to all the nodes.
@@ -1831,18 +2104,6 @@ pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles)
int co_conn_count = pgxc_handles->co_conn_count;
int dn_conn_count = pgxc_handles->dn_conn_count;
- /*
- * Rollback is a special case, being issued because of an error.
- * We try to read and throw away any extra data on the connection before
- * issuing our rollbacks so that we did not read the results of the
- * previous command.
- */
- for (i = 0; i < dn_conn_count; i++)
- clear_socket_data(pgxc_handles->datanode_handles[i]);
-
- for (i = 0; i < co_conn_count; i++)
- clear_socket_data(pgxc_handles->coord_handles[i]);
-
/* Send ROLLBACK to all handles */
if (pgxc_all_handles_send_query(pgxc_handles, "ROLLBACK", false))
result = EOF;
@@ -1963,6 +2224,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
/* Send query to nodes */
for (i = 0; i < conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
@@ -2369,7 +2632,6 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
RemoteQueryState *remotestate;
Relation currentRelation;
-
remotestate = CreateResponseCombiner(0, node->combine_type);
remotestate->ss.ps.plan = (Plan *) node;
remotestate->ss.ps.state = estate;
@@ -2409,6 +2671,10 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
+
+ if (innerPlan(node))
+ innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags);
+
if (outerPlan(node))
outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags);
@@ -2425,7 +2691,9 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
if (src->tts_mcxt == dst->tts_mcxt)
{
/* now dst slot controls the backing message */
- ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow);
+ ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen,
+ src->tts_dataNode, dst,
+ src->tts_shouldFreeRow);
src->tts_shouldFreeRow = false;
}
else
@@ -2433,10 +2701,11 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
/* have to make a copy */
MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt);
int len = src->tts_dataLen;
+ int node = src->tts_dataNode;
char *msg = (char *) palloc(len);
memcpy(msg, src->tts_dataRow, len);
- ExecStoreDataRowTuple(msg, len, dst, true);
+ ExecStoreDataRowTuple(msg, len, node, dst, true);
MemoryContextSwitchTo(oldcontext);
}
}
@@ -2610,20 +2879,21 @@ ExecRemoteQuery(RemoteQueryState *node)
int total_conn_count;
bool need_tran;
PGXCNodeAllHandles *pgxc_connections;
+ TupleTableSlot *innerSlot = NULL;
/*
- * If coordinator plan is specified execute it first.
- * If the plan is returning we are returning these tuples immediately.
- * If it is not returning or returned them all by current invocation
- * we will go ahead and execute remote query. Then we will never execute
- * the outer plan again because node->query_Done flag will be set and
- * execution won't get to that place.
+ * Inner plan for RemoteQuery supplies parameters.
+ * We execute inner plan to get a tuple and use values of the tuple as
+ * parameter values when executing this remote query.
+ * If returned slot contains NULL tuple break execution.
+ * TODO there is a problem how to handle the case if both inner and
+ * outer plans exist. We can decide later, since it is never used now.
*/
- if (outerPlanState(node))
+ if (innerPlanState(node))
{
- TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
- if (!TupIsNull(slot))
- return slot;
+ innerSlot = ExecProcNode(innerPlanState(node));
+// if (TupIsNull(innerSlot))
+// return innerSlot;
}
/*
@@ -2712,6 +2982,9 @@ ExecRemoteQuery(RemoteQueryState *node)
/* See if we have a primary node, execute on it first before the others */
if (primaryconnection)
{
+ if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(primaryconnection);
+
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
@@ -2751,7 +3024,6 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
-
Assert(node->combine_type == COMBINE_TYPE_SAME);
while (node->command_complete_count < 1)
@@ -2760,11 +3032,7 @@ ExecRemoteQuery(RemoteQueryState *node)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
- while (handle_response(primaryconnection, node) == RESPONSE_EOF)
- if (pgxc_node_receive(1, &primaryconnection, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
+ handle_response(primaryconnection, node);
if (node->errorMessage)
{
char *code = node->errorCode;
@@ -2777,6 +3045,8 @@ ExecRemoteQuery(RemoteQueryState *node)
for (i = 0; i < regular_conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
@@ -2811,106 +3081,174 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
+ if (step->cursor || innerSlot)
{
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
+ int fetch = 0;
+ /*
+ * execute and fetch rows only if they will be consumed
+ * immediately by the sorter
+ */
+ if (step->cursor)
+ fetch = 1;
+ /*
+ * Here is ad hoc handling of WHERE CURRENT OF clause.
+ * Previous step have returned the key values to update
+ * replicated tuple into innerSlot and the query is a
+ * DECLARE CURSOR. We do not want to execute this query now
+ * because of unusual fetch procedure: we just need to
+ * position cursor on the first row on each node.
+ * So here we just parse and bind query. As the result cursor
+ * specified by the statement will be created, and next step
+ * will issue MOVE command positioning the cursor properly.
+ * If we want to use parametrized requests for any other
+ * purpose we should return and rework this code
+ */
+ if (pgxc_node_send_query_extended(connections[i],
+ step->sql_statement,
+ NULL,
+ step->cursor,
+ innerSlot ? innerSlot->tts_dataLen : 0,
+ innerSlot ? innerSlot->tts_dataRow : NULL,
+ step->cursor != NULL,
+ fetch) != 0)
+ {
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
}
+ else
+ {
+ if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
+ {
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+ connections[i]->combiner = node;
}
+ if (step->cursor)
+ {
+ node->cursor = step->cursor;
+ node->cursor_count = regular_conn_count;
+ node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *));
+ memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *));
+ }
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0 && node->connections == NULL)
+ {
+ int i = 0;
+
+ if (pgxc_node_receive(regular_conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
/*
- * Stop if all commands are completed or we got a data row and
- * initialized state node for subsequent invocations
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
*/
- while (regular_conn_count > 0 && node->connections == NULL)
+ while (i < regular_conn_count)
{
- int i = 0;
-
- if (pgxc_node_receive(regular_conn_count, connections, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
- /*
- * Handle input from the data nodes.
- * If we got a RESPONSE_DATAROW we can break handling to wrap
- * it into a tuple and return. Handling will be continued upon
- * subsequent invocations.
- * If we got 0, we exclude connection from the list. We do not
- * expect more input from it. In case of non-SELECT query we quit
- * the loop when all nodes finish their work and send ReadyForQuery
- * with empty connections array.
- * If we got EOF, move to the next connection, will receive more
- * data on the next iteration.
- */
- while (i < regular_conn_count)
+ int res = handle_response(connections[i], node);
+ if (res == RESPONSE_EOF)
{
- int res = handle_response(connections[i], node);
- if (res == RESPONSE_EOF)
- {
- i++;
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (i < --regular_conn_count)
- connections[i] = connections[regular_conn_count];
- }
- else if (res == RESPONSE_TUPDESC)
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ExecSetSlotDescriptor(scanslot, node->tuple_desc);
+ /*
+ * Now tuple table slot is responsible for freeing the
+ * descriptor
+ */
+ node->tuple_desc = NULL;
+ if (step->sort)
{
- ExecSetSlotDescriptor(scanslot, node->tuple_desc);
+ SimpleSort *sort = step->sort;
+
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
/*
- * Now tuple table slot is responsible for freeing the
- * descriptor
+ * First message is already in the buffer
+ * Further fetch will be under tuplesort control
+ * If query does not produce rows tuplesort will not
+ * be initialized
*/
- node->tuple_desc = NULL;
- if (step->sort)
- {
- SimpleSort *sort = step->sort;
-
- node->connections = connections;
- node->conn_count = regular_conn_count;
- /*
- * First message is already in the buffer
- * Further fetch will be under tuplesort control
- * If query does not produce rows tuplesort will not
- * be initialized
- */
- node->tuplesortstate = tuplesort_begin_merge(
- scanslot->tts_tupleDescriptor,
- sort->numCols,
- sort->sortColIdx,
- sort->sortOperators,
- sort->nullsFirst,
- node,
- work_mem);
- /*
- * Break the loop, do not wait for first row.
- * Tuplesort module want to control node it is
- * fetching rows from, while in this loop first
- * row would be got from random node
- */
- break;
- }
- }
- else if (res == RESPONSE_DATAROW)
- {
+ node->tuplesortstate = tuplesort_begin_merge(
+ scanslot->tts_tupleDescriptor,
+ sort->numCols,
+ sort->sortColIdx,
+ sort->sortOperators,
+ sort->nullsFirst,
+ node,
+ work_mem);
/*
- * Got first data row, quit the loop
+ * Break the loop, do not wait for first row.
+ * Tuplesort module want to control node it is
+ * fetching rows from, while in this loop first
+ * row would be got from random node
*/
- node->connections = connections;
- node->conn_count = regular_conn_count;
- node->current_conn = i;
break;
}
}
+ else if (res == RESPONSE_DATAROW)
+ {
+ /*
+ * Got first data row, quit the loop
+ */
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ node->current_conn = i;
+ break;
+ }
}
+ }
+
+ if (node->cursor_count)
+ {
+ node->conn_count = node->cursor_count;
+ memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *));
+ node->connections = connections;
+ }
node->query_Done = true;
}
+ if (node->update_cursor)
+ {
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ close_node_cursors(all_dn_handles->datanode_handles,
+ all_dn_handles->dn_conn_count,
+ node->update_cursor);
+ pfree(node->update_cursor);
+ node->update_cursor = NULL;
+ }
+
if (node->tuplesortstate)
{
while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
@@ -2948,98 +3286,35 @@ ExecRemoteQuery(RemoteQueryState *node)
}
else
{
- while (node->conn_count > 0 && !have_tuple)
+ while (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
{
- int i;
-
- /*
- * If combiner already has tuple go ahead and return it
- * otherwise tuple will be cleared
- */
- if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ if (node->simple_aggregates)
{
- if (node->simple_aggregates)
- {
- if (node->simple_aggregates)
- {
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- have_tuple = true;
- }
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- have_tuple = true;
- }
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
}
-
- /*
- * Handle input to get next row or ensure command is completed,
- * starting from connection next after current. If connection
- * does not
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
-
- for (;;)
+ else
{
- int res = handle_response(node->connections[i], node);
- if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- if (pgxc_node_receive(node->conn_count, node->connections, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_DATAROW)
- {
- node->current_conn = i;
- break;
- }
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ have_tuple = true;
+ break;
}
}
/*
* We may need to finalize aggregates
*/
- if (!have_tuple && node->simple_aggregates)
+ if (node->simple_aggregates)
{
finish_simple_aggregates(node, resultslot);
if (!TupIsNull(resultslot))
@@ -3058,7 +3333,31 @@ ExecRemoteQuery(RemoteQueryState *node)
errmsg("%s", node->errorMessage)));
}
- return resultslot;
+ /*
+ * While we are emitting rows we ignore outer plan
+ */
+ if (!TupIsNull(resultslot))
+ return resultslot;
+
+ /*
+ * If coordinator plan is specified execute it first.
+ * If the plan is returning we are returning these tuples immediately.
+ * If it is not returning or returned them all by current invocation
+ * we will go ahead and execute remote query. Then we will never execute
+ * the outer plan again because node->query_Done flag will be set and
+ * execution won't get to that place.
+ */
+ if (outerPlanState(node))
+ {
+ TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
+ if (!TupIsNull(slot))
+ return slot;
+ }
+
+ /*
+ * OK, we have nothing to return, so return NULL
+ */
+ return NULL;
}
/*
@@ -3067,90 +3366,88 @@ ExecRemoteQuery(RemoteQueryState *node)
void
ExecEndRemoteQuery(RemoteQueryState *node)
{
+ ListCell *lc;
/*
- * If processing was interrupted, (ex: client did not consume all the data,
- * or a subquery with LIMIT) we may still have data on the nodes. Try and consume.
- * We do not simply call PGXCNodeConsumeMessages, because the same
- * connection could be used for multiple RemoteQuery steps.
- *
- * It seems most stable checking command_complete_count
- * and only then working with conn_count
- *
- * PGXCTODO: Change in the future when we remove materialization nodes.
+ * shut down the subplan
*/
- if (node->command_complete_count < node->node_count)
- {
- elog(WARNING, "Extra data node messages when ending remote query step");
-
- while (node->conn_count > 0)
- {
- int i = 0;
- int res;
-
- /*
- * Just consume the rest of the messages
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
+ if (innerPlanState(node))
+ ExecEndNode(innerPlanState(node));
- for (;;)
- {
- /* throw away message */
- if (node->msg)
- {
- pfree(node->msg);
- node->msg = NULL;
- }
+ /* clean up the buffer */
+ foreach(lc, node->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ pfree(dataRow->msg);
+ }
+ list_free_deep(node->rowBuffer);
- res = handle_response(node->connections[i], node);
+ node->current_conn = 0;
+ while (node->conn_count > 0)
+ {
+ int res;
+ PGXCNodeHandle *conn = node->connections[node->current_conn];
- if (res == RESPONSE_COMPLETE ||
- node->connections[i]->state == DN_CONNECTION_STATE_ERROR_FATAL ||
- node->connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
+ /* throw away message */
+ if (node->currentRow.msg)
+ {
+ pfree(node->currentRow.msg);
+ node->currentRow.msg = NULL;
+ }
+ /* no data is expected */
+ if (conn->state == DN_CONNECTION_STATE_IDLE ||
+ conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
+ {
+ if (node->current_conn < --node->conn_count)
+ node->connections[node->current_conn] = node->connections[node->conn_count];
+ continue;
+ }
+ res = handle_response(conn, node);
+ if (res == RESPONSE_EOF)
+ {
+ struct timeval timeout;
+ timeout.tv_sec = END_QUERY_TIMEOUT;
+ timeout.tv_usec = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- {
- struct timeval timeout;
- timeout.tv_sec = END_QUERY_TIMEOUT;
- timeout.tv_usec = 0;
-
- if (pgxc_node_receive(node->conn_count, node->connections, &timeout))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes when ending query")));
- }
- }
- }
+ if (pgxc_node_receive(1, &conn, &timeout))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes when ending query")));
}
- elog(WARNING, "Data node connection buffers cleaned");
}
-
/*
* Release tuplesort resources
*/
if (node->tuplesortstate != NULL)
+ {
+ /*
+ * tuplesort_end invalidates minimal tuple if it is in the slot because
+ * deletes the TupleSort memory context, causing seg fault later when
+ * releasing tuple table
+ */
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+ }
node->tuplesortstate = NULL;
/*
+ * If there are active cursors close them
+ */
+ if (node->cursor)
+ close_node_cursors(node->cursor_connections, node->cursor_count, node->cursor);
+
+ if (node->update_cursor)
+ {
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ close_node_cursors(all_dn_handles->datanode_handles,
+ all_dn_handles->dn_conn_count,
+ node->update_cursor);
+ pfree(node->update_cursor);
+ node->update_cursor = NULL;
+ }
+
+ /*
* shut down the subplan
*/
if (outerPlanState(node))
@@ -3165,6 +3462,57 @@ ExecEndRemoteQuery(RemoteQueryState *node)
CloseCombiner(node);
}
+static void
+close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor)
+{
+ int i;
+ RemoteQueryState *combiner;
+
+ for (i = 0; i < conn_count; i++)
+ {
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
+ if (pgxc_node_send_close(connections[i], false, cursor) != 0)
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ if (pgxc_node_send_sync(connections[i]) != 0)
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+
+ while (conn_count > 0)
+ {
+ if (pgxc_node_receive(conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ i = 0;
+ while (i < conn_count)
+ {
+ int res = handle_response(connections[i], combiner);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--conn_count > i)
+ connections[i] = connections[conn_count];
+ }
+ else
+ {
+ // Unexpected response, ignore?
+ }
+ }
+ }
+
+ ValidateAndCloseCombiner(combiner);
+}
+
/*
* Consume any remaining messages on the connections.
* This is useful for calling after ereport()
@@ -3367,6 +3715,8 @@ ExecRemoteUtility(RemoteQuery *node)
/* See if we have a primary nodes, execute on it first before the others */
if (primaryconnection)
{
+ if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(primaryconnection);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
@@ -3421,20 +3771,24 @@ ExecRemoteUtility(RemoteQuery *node)
{
for (i = 0; i < regular_conn_count; i++)
{
+ PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i];
+
+ if (conn->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(conn);
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(pgxc_connections->datanode_handles[i], gxid))
+ if (!need_tran && pgxc_node_send_gxid(conn, gxid))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && pgxc_node_send_snapshot(pgxc_connections->datanode_handles[i], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(conn, snapshot))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (pgxc_node_send_query(pgxc_connections->datanode_handles[i], node->sql_statement) != 0)
+ if (pgxc_node_send_query(conn, node->sql_statement) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -3498,7 +3852,8 @@ ExecRemoteUtility(RemoteQuery *node)
*/
while (i < regular_conn_count)
{
- int res = handle_response(pgxc_connections->datanode_handles[i], remotestate);
+ PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i];
+ int res = handle_response(conn, remotestate);
if (res == RESPONSE_EOF)
{
i++;
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 0d902738f0..3bc3a83aa4 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -78,6 +78,7 @@ init_pgxc_handle(PGXCNodeHandle *pgxc_handle)
pgxc_handle->outBuffer = (char *) palloc(pgxc_handle->outSize);
pgxc_handle->inSize = 16 * 1024;
pgxc_handle->inBuffer = (char *) palloc(pgxc_handle->inSize);
+ pgxc_handle->combiner = NULL;
if (pgxc_handle->outBuffer == NULL || pgxc_handle->inBuffer == NULL)
{
@@ -239,6 +240,7 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum)
handle->sock = sock;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
+ handle->combiner = NULL;
#ifdef DN_CONNECTION_DEBUG
handle->have_row_desc = false;
#endif
@@ -268,7 +270,7 @@ pgxc_node_receive(const int conn_count,
{
/* If connection finised sending do not wait input from it */
if (connections[i]->state == DN_CONNECTION_STATE_IDLE
- || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA)
+ || HAS_MESSAGE_BUFFERED(connections[i]))
continue;
/* prepare select params */
@@ -280,7 +282,7 @@ pgxc_node_receive(const int conn_count,
else
{
/* flag as bad, it will be removed from the list */
- connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY;
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
}
}
@@ -327,11 +329,7 @@ retry:
add_error_message(conn, "unexpected EOF on datanode connection");
elog(WARNING, "unexpected EOF on datanode connection");
/* Should we read from the other connections before returning? */
- return EOF;
- }
- else
- {
- conn->state = DN_CONNECTION_STATE_HAS_DATA;
+ return EOF;
}
}
}
@@ -476,18 +474,6 @@ retry:
}
-/*
- * Clear out socket data and buffer.
- * Throw away any data.
- */
-void
-clear_socket_data (PGXCNodeHandle *conn)
-{
- do {
- conn->inStart = conn->inCursor = conn->inEnd = 0;
- } while (pgxc_node_read_data(conn) > 0);
-}
-
/*
* Get one character from the connection buffer and advance cursor
*/
@@ -576,7 +562,6 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg)
* ensure_in_buffer_capacity() will immediately return
*/
ensure_in_buffer_capacity(5 + (size_t) *len, conn);
- conn->state = DN_CONNECTION_STATE_QUERY;
conn->inCursor = conn->inStart;
return '\0';
}
@@ -589,7 +574,7 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg)
/*
- * Release all data node connections and coordinator connections
+ * Release all data node connections and coordinator connections
* back to pool and release occupied memory
*
* If force_drop is true, we force dropping all of the connections, such as after
@@ -853,6 +838,327 @@ send_some(PGXCNodeHandle *handle, int len)
return result;
}
+/*
+ * Send PARSE message with specified statement down to the Data node
+ */
+int
+pgxc_node_send_parse(PGXCNodeHandle * handle, const char* statement,
+ const char *query)
+{
+ /* statement name size (allow NULL) */
+ int stmtLen = statement ? strlen(statement) + 1 : 1;
+ /* size of query string */
+ int strLen = strlen(query) + 1;
+ /* size of parameter array (always empty for now) */
+ int paramLen = 2;
+
+ /* size + stmtLen + strlen + paramLen */
+ int msgLen = 4 + stmtLen + strLen + paramLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'P';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement name */
+ if (statement)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen);
+ handle->outEnd += stmtLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* query */
+ memcpy(handle->outBuffer + handle->outEnd, query, strLen);
+ handle->outEnd += strLen;
+ /* parameter types (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+
+ return 0;
+}
+
+
+/*
+ * Send BIND message down to the Data node
+ */
+int
+pgxc_node_send_bind(PGXCNodeHandle * handle, const char *portal,
+ const char *statement, int paramlen, char *params)
+{
+ uint16 n16;
+ /* portal name size (allow NULL) */
+ int pnameLen = portal ? strlen(portal) + 1 : 1;
+ /* statement name size (allow NULL) */
+ int stmtLen = statement ? strlen(statement) + 1 : 1;
+ /* size of parameter codes array (always empty for now) */
+ int paramCodeLen = 2;
+ /* size of parameter values array, 2 if no params */
+ int paramValueLen = paramlen ? paramlen : 2;
+ /* size of output parameter codes array (always empty for now) */
+ int paramOutLen = 2;
+
+ /* size + pnameLen + stmtLen + parameters */
+ int msgLen = 4 + pnameLen + stmtLen + paramCodeLen + paramValueLen + paramOutLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'B';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* portal name */
+ if (portal)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen);
+ handle->outEnd += pnameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* statement name */
+ if (statement)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen);
+ handle->outEnd += stmtLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* parameter codes (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+ /* parameter values */
+ if (paramlen)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, params, paramlen);
+ handle->outEnd += paramlen;
+ }
+ else
+ {
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+ }
+ /* output parameter codes (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+
+ return 0;
+}
+
+
+/*
+ * Send DESCRIBE message (portal or statement) down to the Data node
+ */
+int
+pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement,
+ const char *name)
+{
+ /* statement or portal name size (allow NULL) */
+ int nameLen = name ? strlen(name) + 1 : 1;
+
+ /* size + statement/portal + name */
+ int msgLen = 4 + 1 + nameLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'D';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement/portal flag */
+ handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P';
+ /* object name */
+ if (name)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, name, nameLen);
+ handle->outEnd += nameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ return 0;
+}
+
+
+/*
+ * Send CLOSE message (portal or statement) down to the Data node
+ */
+int
+pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement,
+ const char *name)
+{
+ /* statement or portal name size (allow NULL) */
+ int nameLen = name ? strlen(name) + 1 : 1;
+
+ /* size + statement/portal + name */
+ int msgLen = 4 + 1 + nameLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'C';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement/portal flag */
+ handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P';
+ /* object name */
+ if (name)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, name, nameLen);
+ handle->outEnd += nameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ handle->state = DN_CONNECTION_STATE_QUERY;
+
+ return 0;
+}
+
+/*
+ * Send EXECUTE message down to the Data node
+ */
+int
+pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch)
+{
+ /* portal name size (allow NULL) */
+ int pnameLen = portal ? strlen(portal) + 1 : 1;
+
+ /* size + pnameLen + fetchLen */
+ int msgLen = 4 + pnameLen + 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'E';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* portal name */
+ if (portal)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen);
+ handle->outEnd += pnameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ /* fetch */
+ fetch = htonl(fetch);
+ memcpy(handle->outBuffer + handle->outEnd, &fetch, 4);
+ handle->outEnd += 4;
+
+ handle->state = DN_CONNECTION_STATE_QUERY;
+
+ return 0;
+}
+
+
+/*
+ * Send FLUSH message down to the Data node
+ */
+int
+pgxc_node_send_flush(PGXCNodeHandle * handle)
+{
+ /* size */
+ int msgLen = 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'H';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+
+ return pgxc_node_flush(handle);
+}
+
+
+/*
+ * Send SYNC message down to the Data node
+ */
+int
+pgxc_node_send_sync(PGXCNodeHandle * handle)
+{
+ /* size */
+ int msgLen = 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'S';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+
+ return pgxc_node_flush(handle);
+}
+
+
+/*
+ * Send the GXID down to the Data node
+ */
+int
+pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
+ const char *statement, const char *portal,
+ int paramlen, char *params,
+ bool send_describe, int fetch_size)
+{
+ if (pgxc_node_send_parse(handle, statement, query))
+ return EOF;
+ if (pgxc_node_send_bind(handle, portal, statement, paramlen, params))
+ return EOF;
+ if (send_describe)
+ if (pgxc_node_send_describe(handle, false, portal))
+ return EOF;
+ if (fetch_size >= 0)
+ if (pgxc_node_send_execute(handle, portal, fetch_size))
+ return EOF;
+ if (pgxc_node_send_sync(handle))
+ return EOF;
+
+ return 0;
+}
/*
* This method won't return until connection buffer is empty or error occurs
@@ -1034,7 +1340,6 @@ void
add_error_message(PGXCNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
- handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
if (handle->error)
{
/* PGXCTODO append */
@@ -1072,7 +1377,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ errmsg("out of memory")));
}
result->primary_handle = NULL;
@@ -1348,8 +1653,7 @@ is_active_connection(PGXCNodeHandle *handle)
{
return handle->sock != NO_SOCKET &&
handle->state != DN_CONNECTION_STATE_IDLE &&
- handle->state != DN_CONNECTION_STATE_ERROR_NOT_READY &&
- handle->state != DN_CONNECTION_STATE_ERROR_FATAL;
+ !DN_CONNECTION_STATE_ERROR(handle);
}
/*
@@ -1378,7 +1682,7 @@ get_active_nodes(PGXCNodeHandle **connections)
if (is_active_connection(&co_handles[i]))
connections[active_count++] = &co_handles[i];
}
- }
+ }
return active_count;
}
@@ -1442,6 +1746,11 @@ pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer
/* Send to Datanodes */
for (i = 0; i < dn_conn_count; i++)
{
+ /*
+ * Clean connection if fetch in progress
+ */
+ if (pgxc_handles->datanode_handles[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(pgxc_handles->datanode_handles[i]);
if (pgxc_node_send_query(pgxc_handles->datanode_handles[i], buffer))
{
add_error_message(pgxc_handles->datanode_handles[i], "Can not send request");
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index dd72a4bfee..91451ce60a 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1523,7 +1523,7 @@ get_relname_relid(const char *relname, Oid relnamespace)
0, 0);
}
-#ifdef NOT_USED
+#ifdef PGXC
/*
* get_relnatts
*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index cb078be067..50ad300429 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2875,25 +2875,119 @@ reversedirection_heap(Tuplesortstate *state)
static unsigned int
getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
{
- PGXCNodeHandle *conn = state->combiner->connections[tapenum];
+ RemoteQueryState *combiner = state->combiner;
+ PGXCNodeHandle *conn = combiner->connections[tapenum];
+ /*
+ * If connection is active (potentially has data to read) we can get node
+ * number from the connection. If connection is not active (we have read all
+ * available data rows) and if we have buffered data from that connection
+ * the node number is stored in combiner->tapenodes[tapenum].
+ * If connection is inactive and no buffered data we have EOF condition
+ */
+ int nodenum = conn ? conn->nodenum : combiner->tapenodes[tapenum];
+ unsigned int len = 0;
+ ListCell *lc;
+ ListCell *prev = NULL;
+
+ /*
+ * If there are buffered rows iterate over them and get first from
+ * the requested tape
+ */
+ foreach (lc, combiner->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ if (dataRow->msgnode == nodenum)
+ {
+ combiner->currentRow = *dataRow;
+ combiner->rowBuffer = list_delete_cell(combiner->rowBuffer, lc, prev);
+ return dataRow->msglen;
+ }
+ prev = lc;
+ }
+
+ /* Nothing is found in the buffer, check for EOF */
+ if (conn == NULL)
+ {
+ if (eofOK)
+ return 0;
+ else
+ elog(ERROR, "unexpected end of data");
+ }
+
+ /* Going to get data from connection, buffer if needed */
+ if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != combiner)
+ BufferConnection(conn);
+
+ /* Request more rows if needed */
+ if (conn->state == DN_CONNECTION_STATE_IDLE)
+ {
+ Assert(combiner->cursor);
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ conn->state = DN_CONNECTION_STATE_QUERY;
+ conn->combiner = combiner;
+ }
+ /* Read data from the connection until get a row or EOF */
for (;;)
{
- switch (handle_response(conn, state->combiner))
+ switch (handle_response(conn, combiner))
{
+ case RESPONSE_SUSPENDED:
+ /* Send Execute to request next row */
+ Assert(combiner->cursor);
+ if (len)
+ return len;
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ conn->state = DN_CONNECTION_STATE_QUERY;
+ conn->combiner = combiner;
+ /* fallthru */
case RESPONSE_EOF:
+ /* receive more data */
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg(conn->error)));
break;
case RESPONSE_COMPLETE:
+ /* EOF encountered, close the tape and report EOF */
+ if (combiner->cursor)
+ {
+ combiner->connections[tapenum] = NULL;
+ if (len)
+ return len;
+ }
if (eofOK)
return 0;
else
elog(ERROR, "unexpected end of data");
break;
case RESPONSE_DATAROW:
- return state->combiner->msglen;
+ Assert(len == 0);
+ if (state->combiner->cursor)
+ {
+ /*
+ * We fetching one row at a time when running EQP
+ * so read following PortalSuspended or ResponseComplete
+ * to leave connection clean between the calls
+ */
+ len = state->combiner->currentRow.msglen;
+ break;
+ }
+ else
+ return state->combiner->currentRow.msglen;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index c85b1b0c61..0c2c4fe0a2 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -124,6 +124,7 @@ typedef struct TupleTableSlot
*/
char *tts_dataRow; /* Tuple data in DataRow format */
int tts_dataLen; /* Actual length of the data row */
+ int tts_dataNode; /* Originating node of the data row */
bool tts_shouldFreeRow; /* should pfree tts_dataRow? */
struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */
#endif
@@ -177,6 +178,7 @@ extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup,
#ifdef PGXC
extern TupleTableSlot *ExecStoreDataRowTuple(char *msg,
size_t len,
+ int node,
TupleTableSlot *slot,
bool shouldFree);
#endif
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 4f4a0c9c53..4a33842bdf 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -16,8 +16,8 @@
#ifndef EXECREMOTE_H
#define EXECREMOTE_H
-#include "pgxcnode.h"
#include "locator.h"
+#include "pgxcnode.h"
#include "planner.h"
#include "access/tupdesc.h"
#include "executor/tuptable.h"
@@ -29,9 +29,10 @@
/* Outputs of handle_response() */
#define RESPONSE_EOF EOF
#define RESPONSE_COMPLETE 0
-#define RESPONSE_TUPDESC 1
-#define RESPONSE_DATAROW 2
-#define RESPONSE_COPY 3
+#define RESPONSE_SUSPENDED 1
+#define RESPONSE_TUPDESC 2
+#define RESPONSE_DATAROW 3
+#define RESPONSE_COPY 4
typedef enum
{
@@ -42,6 +43,18 @@ typedef enum
REQUEST_TYPE_COPY_OUT /* Copy Out response */
} RequestType;
+/*
+ * Represents a DataRow message received from a remote node.
+ * Contains originating node number and message body in DataRow format without
+ * message code and length. Length is separate field
+ */
+typedef struct RemoteDataRowData
+{
+ char *msg; /* last data row message */
+ int msglen; /* length of the data row message */
+ int msgnode; /* node number of the data row message */
+} RemoteDataRowData;
+typedef RemoteDataRowData *RemoteDataRow;
typedef struct RemoteQueryState
{
@@ -60,8 +73,18 @@ typedef struct RemoteQueryState
char errorCode[5]; /* error code to send back to client */
char *errorMessage; /* error message to send back to client */
bool query_Done; /* query has been sent down to data nodes */
- char *msg; /* last data row message */
- int msglen; /* length of the data row message */
+ RemoteDataRowData currentRow; /* next data ro to be wrapped into a tuple */
+ /* TODO use a tuplestore as a rowbuffer */
+ List *rowBuffer; /* buffer where rows are stored when connection
+ * should be cleaned for reuse by other RemoteQuery */
+ /*
+ * To handle special case - if there is a simple sort and sort connection
+ * is buffered. If EOF is reached on a connection it should be removed from
+ * the array, but we need to know node number of the connection to find
+ * messages in the buffer. So we store nodenum to that array if reach EOF
+ * when buffering
+ */
+ int *tapenodes;
/*
* While we are not supporting grouping use this flag to indicate we need
* to initialize collecting of aggregates from the DNs
@@ -74,6 +97,11 @@ typedef struct RemoteQueryState
MemoryContext tmp_ctx; /* separate context is needed to compare tuples */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
uint64 processed; /* count of data rows when running CopyOut */
+ /* cursor support */
+ char *cursor; /* cursor name */
+ char *update_cursor; /* throw this cursor current tuple can be updated */
+ int cursor_count; /* total count of participating nodes */
+ PGXCNodeHandle **cursor_connections;/* data node connections being combined */
} RemoteQueryState;
/* Multinode Executor */
@@ -98,9 +126,9 @@ extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
+extern void BufferConnection(PGXCNodeHandle *conn);
extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
-extern void PGXCNodeConsumeMessages(void);
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index d6c5af950c..a57e4f1673 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -34,17 +34,18 @@ typedef enum
{
DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
- DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
- DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
- DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
DN_CONNECTION_STATE_COPY_IN,
DN_CONNECTION_STATE_COPY_OUT
} DNConnectionState;
#define DN_CONNECTION_STATE_ERROR(dnconn) \
- (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
+ ((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
+ || (dnconn)->transaction_status == 'E')
+
+#define HAS_MESSAGE_BUFFERED(conn) \
+ ((conn)->inCursor + 4 < (conn)->inEnd \
+ && (conn)->inCursor + ntohl(*((uint32_t *) ((conn)->inBuffer + (conn)->inCursor + 1))) < (conn)->inEnd)
struct pgxc_node_handle
{
@@ -54,6 +55,7 @@ struct pgxc_node_handle
/* Connection state */
char transaction_status;
DNConnectionState state;
+ struct RemoteQueryState *combiner;
#ifdef DN_CONNECTION_DEBUG
bool have_row_desc;
#endif
@@ -103,6 +105,16 @@ extern int ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handl
extern int ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
extern int pgxc_node_send_query(PGXCNodeHandle * handle, const char *query);
+extern int pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement,
+ const char *name);
+extern int pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch);
+extern int pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement,
+ const char *name);
+extern int pgxc_node_send_sync(PGXCNodeHandle * handle);
+extern int pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
+ const char *statement, const char *portal,
+ int paramlen, char *params,
+ bool send_describe, int fetch_size);
extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid);
extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot);
extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp);
@@ -119,6 +131,5 @@ extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const c
extern char get_message(PGXCNodeHandle *conn, int *len, char **msg);
extern void add_error_message(PGXCNodeHandle * handle, const char *message);
-extern void clear_socket_data (PGXCNodeHandle *conn);
#endif
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 8aae356fd2..7284ef7432 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -85,6 +85,7 @@ typedef struct
SimpleDistinct *distinct;
bool read_only; /* do not use 2PC when committing read only steps */
bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ char *cursor; /* if specified use it as a Portal name on data nodes */
RemoteQueryExecType exec_type;
char *relname;
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 854b3eac9a..aef70eb7d1 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -88,6 +88,9 @@ extern char func_volatile(Oid funcid);
extern float4 get_func_cost(Oid funcid);
extern float4 get_func_rows(Oid funcid);
extern Oid get_relname_relid(const char *relname, Oid relnamespace);
+#ifdef PGXC
+extern int get_relnatts(Oid relid);
+#endif
extern char *get_rel_name(Oid relid);
extern Oid get_rel_namespace(Oid relid);
extern Oid get_rel_type_id(Oid relid);