summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMason Sharp2010-11-14 19:52:45 +0000
committerPavan Deolasee2011-05-19 16:45:22 +0000
commitb08d7c0f3481745289b34691bf27b24940ac7e1b (patch)
treec131420fa8fac219b37f1703373ddafa880350a3
parent620d0e377c41c8acc29ab83069db5d3180087963 (diff)
1. Support for UPDATE and DELETE WHERE CURRENT OF for
replicated tables through a cursor. Cursor on replicated table should be declared with FOR UPDATE to be updateable. CTID is appended to the field list in this case. Plan for replicated WHERE CURRENT OF consists of multiple steps: - Select all values of current row by CTID (on single node) - Declare cursor for select from the table with WHERE clause looking like col1 = value1 AND col2 = value2 AND ..., all columns of the table are listed (on all nodes) - Move the cursor to the first position (on all nodes) - Update the cursor (on all nodes) 2. Support for Extended Query Protol (EQP) in coordinator-data node communication. Allows handling data node cursors effectively. Planner tells executor to use EQP by setting cursor field of RemoteQuery. 3. Enabled data node connection sharing between multiple concurrent RemoteQueries. Needed to better support concurrent user defined cursors and multi-step query plans. If another RemoteQuery needs to use data node connection, pending input intended for previous connection is buffered, and when previous RemoteQuery is active again it will consume buffered input first. It is recommended to use EQP for multi-step plans and user-defined cursors to minimize amount of buffered data if executor is switching between RemoteQueries. By Andrei Martsinchyk
-rw-r--r--src/backend/access/transam/xact.c20
-rw-r--r--src/backend/executor/execTuples.c10
-rw-r--r--src/backend/nodes/copyfuncs.c3
-rw-r--r--src/backend/pgxc/plan/planner.c596
-rw-r--r--src/backend/pgxc/pool/execRemote.c963
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c361
-rw-r--r--src/backend/utils/cache/lsyscache.c2
-rw-r--r--src/backend/utils/sort/tuplesort.c100
-rw-r--r--src/include/executor/tuptable.h2
-rw-r--r--src/include/pgxc/execRemote.h42
-rw-r--r--src/include/pgxc/pgxcnode.h23
-rw-r--r--src/include/pgxc/planner.h1
-rw-r--r--src/include/utils/lsyscache.h3
13 files changed, 1642 insertions, 484 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 262fd3b3d4..2030a0079a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2280,6 +2280,21 @@ AbortTransaction(void)
*/
SetUserIdAndContext(s->prevUser, s->prevSecDefCxt);
+#ifdef PGXC
+ /*
+ * We should rollback on the data nodes before cleaning up portals
+ * to be sure data structures used by connections are not freed yet
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ /*
+ * Make sure this is rolled back on the DataNodes
+ * if so it will just return
+ */
+ PGXCNodeRollback();
+ }
+#endif
+
/*
* do abort processing
*/
@@ -2300,11 +2315,6 @@ AbortTransaction(void)
/* This is done by remote Coordinator */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /*
- * Make sure this is rolled back on the DataNodes
- * if so it will just return
- */
- PGXCNodeRollback();
RollbackTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index a6c496eee9..0c43e9479c 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -153,6 +153,7 @@ ExecCreateTupleTable(int tableSize)
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
slot->tts_attinmeta = NULL;
#endif
slot->tts_mcxt = CurrentMemoryContext;
@@ -238,6 +239,7 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc)
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
slot->tts_attinmeta = NULL;
#endif
slot->tts_mcxt = CurrentMemoryContext;
@@ -440,6 +442,7 @@ ExecStoreTuple(HeapTuple tuple,
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
/*
@@ -509,6 +512,7 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
/*
@@ -547,7 +551,8 @@ ExecStoreMinimalTuple(MinimalTuple mtup,
* --------------------------------
*/
TupleTableSlot *
-ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree)
+ExecStoreDataRowTuple(char *msg, size_t len, int node, TupleTableSlot *slot,
+ bool shouldFree)
{
/*
* sanity checks
@@ -586,6 +591,7 @@ ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFr
slot->tts_mintuple = NULL;
slot->tts_dataRow = msg;
slot->tts_dataLen = len;
+ slot->tts_dataNode = node;
/* Mark extracted state invalid */
slot->tts_nvalid = 0;
@@ -624,6 +630,7 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */
slot->tts_shouldFreeRow = false;
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
#endif
slot->tts_tuple = NULL;
@@ -976,6 +983,7 @@ ExecMaterializeSlot(TupleTableSlot *slot)
{
slot->tts_dataRow = NULL;
slot->tts_dataLen = -1;
+ slot->tts_dataNode = 0;
}
#endif
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 78c5543dab..aa0587f9c7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -839,6 +839,7 @@ _copyRemoteQuery(RemoteQuery *from)
COPY_NODE_FIELD(distinct);
COPY_SCALAR_FIELD(read_only);
COPY_SCALAR_FIELD(force_autocommit);
+ COPY_STRING_FIELD(cursor);
COPY_STRING_FIELD(relname);
COPY_SCALAR_FIELD(remotejoin);
@@ -888,7 +889,7 @@ _copySimpleAgg(SimpleAgg *from)
COPY_SCALAR_FIELD(transfn);
COPY_SCALAR_FIELD(finalfn);
if (!from->initValueIsNull)
- newnode->initValue = datumCopy(from->initValue, from->transtypeByVal,
+ newnode->initValue = datumCopy(from->initValue, from->transtypeByVal,
from->transtypeLen);
COPY_SCALAR_FIELD(initValueIsNull);
COPY_SCALAR_FIELD(inputtypeLen);
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 5ea6d8a0a5..af368e424a 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -31,6 +31,7 @@
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
+#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
#include "pgxc/planner.h"
#include "tcop/pquery.h"
@@ -38,6 +39,7 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
+#include "utils/portal.h"
#include "utils/syscache.h"
@@ -123,7 +125,7 @@ typedef struct XCWalkerContext
{
Query *query;
bool isRead;
- ExecNodes *exec_nodes; /* resulting execution nodes */
+ RemoteQuery *query_step; /* remote query step being analized */
Special_Conditions *conditions;
bool multilevel_join;
List *rtables; /* a pointer to a list of rtables */
@@ -141,13 +143,44 @@ bool StrictStatementChecking = true;
/* Forbid multi-node SELECT statements with an ORDER BY clause */
bool StrictSelectChecking = false;
-static ExecNodes *get_plan_nodes(Query *query, bool isRead);
+static void get_plan_nodes(Query *query, RemoteQuery *step, bool isRead);
static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context);
static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context);
static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt);
static void InitXCWalkerContext(XCWalkerContext *context);
/*
+ * Find position of specified substring in the string
+ * All non-printable symbols of str treated as spaces, all letters as uppercase
+ * Returns pointer to the beginning of the substring or NULL
+ */
+static char *
+strpos(char *str, char *substr)
+{
+ char copy[strlen(str) + 1];
+ char *src = str;
+ char *dst = copy;
+
+ /*
+ * Initialize mutable copy, converting letters to uppercase and
+ * various witespace characters to spaces
+ */
+ while (*src)
+ {
+ if (isspace(*src))
+ {
+ src++;
+ *dst++ = ' ';
+ }
+ else
+ *dst++ = toupper(*src++);
+ }
+ *dst = '\0';
+ dst = strstr(copy, substr);
+ return dst ? str + (dst - copy) : NULL;
+}
+
+/*
* True if both lists contain only one node and are the same
*/
static bool
@@ -509,7 +542,7 @@ get_plan_nodes_insert(Query *query)
* Get list of parent-child joins (partitioned together)
* Get list of joins with replicated tables
*
- * If we encounter an expression such as a cross-node join that cannot
+ * If we encounter an expression such as a cross-node join that cannot
* be easily handled in a single step, we stop processing and return true,
* otherwise false.
*
@@ -536,6 +569,242 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
if (!context->conditions)
context->conditions = new_special_conditions();
+ /* Handle UPDATE/DELETE ... WHERE CURRENT OF ... */
+ if (IsA(expr_node, CurrentOfExpr))
+ {
+ /* Find referenced portal and figure out what was the last fetch node */
+ Portal portal;
+ QueryDesc *queryDesc;
+ PlanState *state;
+ CurrentOfExpr *cexpr = (CurrentOfExpr *) expr_node;
+ char *cursor_name = cexpr->cursor_name;
+ char *node_cursor;
+
+ /* Find the cursor's portal */
+ portal = GetPortalByName(cursor_name);
+ if (!PortalIsValid(portal))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_CURSOR),
+ errmsg("cursor \"%s\" does not exist", cursor_name)));
+
+ queryDesc = PortalGetQueryDesc(portal);
+ if (queryDesc == NULL || queryDesc->estate == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is held from a previous transaction",
+ cursor_name)));
+
+ /*
+ * The cursor must have a current result row: per the SQL spec, it's
+ * an error if not.
+ */
+ if (portal->atStart || portal->atEnd)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_STATE),
+ errmsg("cursor \"%s\" is not positioned on a row",
+ cursor_name)));
+
+ state = ExecGetActivePlanTree(queryDesc);
+ if (IsA(state, RemoteQueryState))
+ {
+ RemoteQueryState *node = (RemoteQueryState *) state;
+ RemoteQuery *step = (RemoteQuery *) state->plan;
+
+ /*
+ * 1. step query: SELECT * FROM <table> WHERE ctid = <cur_ctid>,
+ * <cur_ctid> is taken from the scantuple ot the target step
+ * step node list: current node of the target step.
+ * 2. step query: DECLARE <xxx> CURSOR FOR SELECT * FROM <table>
+ * WHERE <col1> = <val1> AND <col2> = <val2> ... FOR UPDATE
+ * <xxx> is generated from cursor name of the target step,
+ * <col> and <val> pairs are taken from the step 1.
+ * step node list: all nodes of <table>
+ * 3. step query: MOVE <xxx>
+ * step node list: all nodes of <table>
+ */
+ RangeTblEntry *table = (RangeTblEntry *) linitial(context->query->rtable);
+ node_cursor = step->cursor;
+ rel_loc_info1 = GetRelationLocInfo(table->relid);
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
+ context->query_step->exec_nodes->baselocatortype = rel_loc_info1->locatorType;
+ if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED)
+ {
+ RemoteQuery *step1, *step2, *step3;
+ /*
+ * We do not need first three steps if cursor already exists and
+ * positioned.
+ */
+ if (node->update_cursor)
+ {
+ step3 = NULL;
+ node_cursor = node->update_cursor;
+ }
+ else
+ {
+ char *tableName = get_rel_name(table->relid);
+ int natts = get_relnatts(table->relid);
+ char *attnames[natts];
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ /*
+ * ctid is the last attribute, but more correct to iterate over
+ * attributes and find by name, or store index for table
+ */
+ Datum ctid = slot->tts_values[slot->tts_tupleDescriptor->natts - 1];
+ char *ctid_str = (char *) DirectFunctionCall1(tidout, ctid);
+ int nodenum = slot->tts_dataNode;
+ AttrNumber att;
+ StringInfoData buf;
+ HeapTuple tp;
+ int i;
+ MemoryContext context_save;
+
+ initStringInfo(&buf);
+
+ /* Step 1: select tuple values by ctid */
+ step1 = makeNode(RemoteQuery);
+ appendStringInfoString(&buf, "SELECT ");
+ for (att = 1; att <= natts; att++)
+ {
+ TargetEntry *tle;
+ Var *expr;
+
+ tp = SearchSysCache(ATTNUM,
+ ObjectIdGetDatum(table->relid),
+ Int16GetDatum(att),
+ 0, 0);
+ if (HeapTupleIsValid(tp))
+ {
+ Form_pg_attribute att_tup = (Form_pg_attribute) GETSTRUCT(tp);
+
+ /* add comma before all except first attributes */
+ if (att > 1)
+ appendStringInfoString(&buf, ", ");
+ attnames[att-1] = pstrdup(NameStr(att_tup->attname));
+ appendStringInfoString(&buf, attnames[att - 1]);
+ expr = makeVar(att, att, att_tup->atttypid,
+ att_tup->atttypmod, 0);
+ tle = makeTargetEntry((Expr *) expr, att,
+ attnames[att - 1], false);
+ step1->scan.plan.targetlist = lappend(step1->scan.plan.targetlist, tle);
+ ReleaseSysCache(tp);
+ }
+ else
+ elog(ERROR, "cache lookup failed for attribute %d of relation %u",
+ att, table->relid);
+ }
+ appendStringInfo(&buf, " FROM %s WHERE ctid = '%s'",
+ tableName, ctid_str);
+ step1->sql_statement = pstrdup(buf.data);
+ step1->is_single_step = true;
+ step1->exec_nodes = makeNode(ExecNodes);
+ step1->read_only = true;
+ step1->exec_nodes->nodelist = list_make1_int(nodenum);
+
+ /* Step 2: declare cursor for update target table */
+ step2 = makeNode(RemoteQuery);
+ resetStringInfo(&buf);
+
+ appendStringInfoString(&buf, step->cursor);
+ appendStringInfoString(&buf, "upd");
+ /* This need to survive while the target Portal is alive */
+ context_save = MemoryContextSwitchTo(PortalGetHeapMemory(portal));
+ node_cursor = pstrdup(buf.data);
+ node->update_cursor = node_cursor;
+ MemoryContextSwitchTo(context_save);
+ resetStringInfo(&buf);
+
+ appendStringInfo(&buf,
+ "DECLARE %s CURSOR FOR SELECT * FROM %s WHERE ",
+ node_cursor, tableName);
+ for (i = 0; i < natts; i++)
+ {
+ /* add comma before all except first attributes */
+ if (i)
+ appendStringInfoString(&buf, "AND ");
+ appendStringInfo(&buf, "%s = $%d ", attnames[i], i+1);
+ }
+ appendStringInfoString(&buf, "FOR UPDATE");
+ step2->sql_statement = pstrdup(buf.data);
+ step2->is_single_step = true;
+ step2->read_only = true;
+ step2->exec_nodes = makeNode(ExecNodes);
+ step2->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ innerPlan(step2) = (Plan *) step1;
+ /* Step 3: move cursor to first position */
+ step3 = makeNode(RemoteQuery);
+ resetStringInfo(&buf);
+ appendStringInfo(&buf, "MOVE %s", node_cursor);
+ step3->sql_statement = pstrdup(buf.data);
+ step3->is_single_step = true;
+ step3->read_only = true;
+ step3->exec_nodes = makeNode(ExecNodes);
+ step3->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ innerPlan(step3) = (Plan *) step2;
+
+ innerPlan(context->query_step) = (Plan *) step3;
+
+ pfree(buf.data);
+ }
+ context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info1->nodeList);
+ }
+ else
+ {
+ /* Take target node from last scan tuple of referenced step */
+ int curr_node = node->ss.ss_ScanTupleSlot->tts_dataNode;
+ context->query_step->exec_nodes->nodelist = lappend_int(context->query_step->exec_nodes->nodelist, curr_node);
+ }
+ FreeRelationLocInfo(rel_loc_info1);
+
+ context->query_step->is_single_step = true;
+ /*
+ * replace cursor name in the query if differs
+ */
+ if (strcmp(cursor_name, node_cursor))
+ {
+ StringInfoData buf;
+ char *str = context->query->sql_statement;
+ /*
+ * Find last occurence of cursor_name
+ */
+ for (;;)
+ {
+ char *next = strstr(str + 1, cursor_name);
+ if (next)
+ str = next;
+ else
+ break;
+ }
+
+ /*
+ * now str points to cursor name truncate string here
+ * do not care the string is modified - we will pfree it
+ * soon anyway
+ */
+ *str = '\0';
+
+ /* and move str at the beginning of the reminder */
+ str += strlen(cursor_name);
+
+ /* build up new statement */
+ initStringInfo(&buf);
+ appendStringInfoString(&buf, context->query->sql_statement);
+ appendStringInfoString(&buf, node_cursor);
+ appendStringInfoString(&buf, str);
+
+ /* take the result */
+ pfree(context->query->sql_statement);
+ context->query->sql_statement = buf.data;
+ }
+ return false;
+ }
+ // ??? current plan node is not a remote query
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->exec_on_coord = true;
+ return false;
+ }
+
if (IsA(expr_node, Var))
{
/* If we get here, that meant the previous call before recursing down did not
@@ -800,13 +1069,13 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
bool is_multilevel;
int save_parent_child_count = 0;
SubLink *sublink = (SubLink *) expr_node;
- ExecNodes *save_exec_nodes = context->exec_nodes; /* Save old exec_nodes */
+ ExecNodes *save_exec_nodes = context->query_step->exec_nodes; /* Save old exec_nodes */
/* save parent-child count */
- if (context->exec_nodes)
+ if (context->query_step->exec_nodes)
save_parent_child_count = list_length(context->conditions->partitioned_parent_child);
- context->exec_nodes = NULL;
+ context->query_step->exec_nodes = NULL;
context->multilevel_join = false;
current_rtable = ((Query *) sublink->subselect)->rtable;
@@ -823,31 +1092,31 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context)
context->multilevel_join = false;
/* Allow for replicated tables */
- if (!context->exec_nodes)
- context->exec_nodes = save_exec_nodes;
+ if (!context->query_step->exec_nodes)
+ context->query_step->exec_nodes = save_exec_nodes;
else
{
if (save_exec_nodes)
{
- if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
+ if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
{
- context->exec_nodes = save_exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
}
else
{
if (save_exec_nodes->tableusagetype != TABLE_USAGE_TYPE_USER_REPLICATED)
{
/* See if they run on the same node */
- if (same_single_node (context->exec_nodes->nodelist, save_exec_nodes->nodelist))
+ if (same_single_node (context->query_step->exec_nodes->nodelist, save_exec_nodes->nodelist))
return false;
}
else
/* use old value */
- context->exec_nodes = save_exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
}
} else
{
- if (context->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
+ if (context->query_step->exec_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED)
return false;
/* See if subquery safely joins with parent */
if (!is_multilevel)
@@ -986,8 +1255,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (contains_only_pg_catalog (query->rtable))
{
/* just pg_catalog tables */
- context->exec_nodes = makeNode(ExecNodes);
- context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
context->exec_on_coord = true;
return false;
}
@@ -1005,7 +1274,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (rte->rtekind == RTE_SUBQUERY)
{
- ExecNodes *save_exec_nodes = context->exec_nodes;
+ ExecNodes *save_exec_nodes = context->query_step->exec_nodes;
Special_Conditions *save_conditions = context->conditions; /* Save old conditions */
List *current_rtable = rte->subquery->rtable;
@@ -1025,8 +1294,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
context->rtables = list_delete_ptr(context->rtables, current_rtable);
context->conditions = save_conditions;
- current_nodes = context->exec_nodes;
- context->exec_nodes = save_exec_nodes;
+ current_nodes = context->query_step->exec_nodes;
+ context->query_step->exec_nodes = save_exec_nodes;
if (current_nodes)
current_usage_type = current_nodes->tableusagetype;
@@ -1103,8 +1372,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
/* If we are just dealing with pg_catalog, just return */
if (table_usage_type == TABLE_USAGE_TYPE_PGCATALOG)
{
- context->exec_nodes = makeNode(ExecNodes);
- context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
+ context->query_step->exec_nodes = makeNode(ExecNodes);
+ context->query_step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG;
context->exec_on_coord = true;
return false;
}
@@ -1113,6 +1382,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (examine_conditions_walker(query->jointree->quals, context))
return true;
+ if (context->query_step->exec_nodes)
+ return false;
+
/* Examine join conditions, see if each join is single-node safe */
if (context->join_list != NULL)
{
@@ -1174,27 +1446,27 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
if (!rel_loc_info)
return true;
- context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
+ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
}
}
else
{
- context->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
+ context->query_step->exec_nodes = GetRelationNodes(rel_loc_info, NULL, context->isRead);
}
/* Note replicated table usage for determining safe queries */
- if (context->exec_nodes)
+ if (context->query_step->exec_nodes)
{
if (table_usage_type == TABLE_USAGE_TYPE_USER && IsReplicated(rel_loc_info))
table_usage_type = TABLE_USAGE_TYPE_USER_REPLICATED;
- context->exec_nodes->tableusagetype = table_usage_type;
+ context->query_step->exec_nodes->tableusagetype = table_usage_type;
}
}
/* check for partitioned col comparison against a literal */
else if (list_length(context->conditions->partitioned_literal_comps) > 0)
{
- context->exec_nodes = NULL;
+ context->query_step->exec_nodes = NULL;
/*
* Make sure that if there are multiple such comparisons, that they
@@ -1208,11 +1480,11 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
lit_comp->rel_loc_info, &(lit_comp->constant), true);
test_exec_nodes->tableusagetype = table_usage_type;
- if (context->exec_nodes == NULL)
- context->exec_nodes = test_exec_nodes;
+ if (context->query_step->exec_nodes == NULL)
+ context->query_step->exec_nodes = test_exec_nodes;
else
{
- if (!same_single_node(context->exec_nodes->nodelist, test_exec_nodes->nodelist))
+ if (!same_single_node(context->query_step->exec_nodes->nodelist, test_exec_nodes->nodelist))
{
return true;
}
@@ -1231,22 +1503,22 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
parent_child = (Parent_Child_Join *)
linitial(context->conditions->partitioned_parent_child);
- context->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead);
- context->exec_nodes->tableusagetype = table_usage_type;
+ context->query_step->exec_nodes = GetRelationNodes(parent_child->rel_loc_info1, NULL, context->isRead);
+ context->query_step->exec_nodes->tableusagetype = table_usage_type;
}
if (from_query_nodes)
{
- if (!context->exec_nodes)
+ if (!context->query_step->exec_nodes)
{
- context->exec_nodes = from_query_nodes;
+ context->query_step->exec_nodes = from_query_nodes;
return false;
}
/* Just use exec_nodes if the from subqueries are all replicated or using the exact
* same node
*/
else if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED
- || (same_single_node(from_query_nodes->nodelist, context->exec_nodes->nodelist)))
+ || (same_single_node(from_query_nodes->nodelist, context->query_step->exec_nodes->nodelist)))
return false;
else
{
@@ -1254,7 +1526,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
* but the parent query applies a condition on the from subquery.
*/
if (list_length(query->jointree->fromlist) == from_subquery_count
- && list_length(context->exec_nodes->nodelist) == 1)
+ && list_length(context->query_step->exec_nodes->nodelist) == 1)
return false;
}
/* Too complicated, give up */
@@ -1270,8 +1542,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
static void
InitXCWalkerContext(XCWalkerContext *context)
{
+ context->query = NULL;
context->isRead = true;
- context->exec_nodes = NULL;
+ context->query_step = NULL;
context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions));
context->rtables = NIL;
context->multilevel_join = false;
@@ -1286,27 +1559,26 @@ InitXCWalkerContext(XCWalkerContext *context)
* Top level entry point before walking query to determine plan nodes
*
*/
-static ExecNodes *
-get_plan_nodes(Query *query, bool isRead)
+static void
+get_plan_nodes(Query *query, RemoteQuery *step, bool isRead)
{
- ExecNodes *result_nodes = NULL;
XCWalkerContext context;
InitXCWalkerContext(&context);
+ context.query = query;
context.isRead = isRead;
+ context.query_step = step;
context.rtables = lappend(context.rtables, query->rtable);
- if (!get_plan_nodes_walker((Node *) query, &context))
- result_nodes = context.exec_nodes;
- if (context.exec_on_coord && result_nodes)
+ if ((get_plan_nodes_walker((Node *) query, &context)
+ || context.exec_on_coord) && context.query_step->exec_nodes)
{
- pfree(result_nodes);
- result_nodes = NULL;
+ pfree(context.query_step->exec_nodes);
+ context.query_step->exec_nodes = NULL;
}
free_special_relations(context.conditions);
free_join_list(context.join_list);
- return result_nodes;
}
@@ -1315,32 +1587,25 @@ get_plan_nodes(Query *query, bool isRead)
*
* return NULL if it is not safe to be done in a single step.
*/
-static ExecNodes *
-get_plan_nodes_command(Query *query)
+static void
+get_plan_nodes_command(Query *query, RemoteQuery *step)
{
- ExecNodes *exec_nodes = NULL;
-
switch (query->commandType)
{
case CMD_SELECT:
- exec_nodes = get_plan_nodes(query, true);
+ get_plan_nodes(query, step, true);
break;
case CMD_INSERT:
- exec_nodes = get_plan_nodes_insert(query);
+ step->exec_nodes = get_plan_nodes_insert(query);
break;
case CMD_UPDATE:
case CMD_DELETE:
/* treat as a select */
- exec_nodes = get_plan_nodes(query, false);
+ get_plan_nodes(query, step, false);
break;
-
- default:
- return NULL;
}
-
- return exec_nodes;
}
@@ -1645,8 +1910,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
List *sub_tlist = step->scan.plan.targetlist;
ListCell *l;
StringInfo buf = makeStringInfo();
- char *sql;
- char *cur;
char *sql_from;
context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL);
@@ -1677,23 +1940,9 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
* Do not handle the case if " FROM " we found is not a "FROM" keyword, but,
* for example, a part of string constant.
*/
- sql = pstrdup(step->sql_statement); /* mutable copy */
- /* string to upper case, for comparing */
- cur = sql;
- while (*cur)
- {
- /* replace whitespace with a space */
- if (isspace((unsigned char) *cur))
- *cur = ' ';
- *cur++ = toupper(*cur);
- }
-
- /* find the keyword */
- sql_from = strstr(sql, " FROM ");
+ sql_from = strpos(step->sql_statement, " FROM ");
if (sql_from)
{
- /* the same offset in the original string */
- int offset = sql_from - sql;
/*
* Truncate query at the position of terminating semicolon to be able
* to append extra order by entries. If query is submitted from client
@@ -1705,7 +1954,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
if (*end == ';')
*end = '\0';
- appendStringInfoString(buf, step->sql_statement + offset);
+ appendStringInfoString(buf, sql_from);
}
if (extra_sort)
@@ -1728,9 +1977,6 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
}
}
- /* do not need the copy */
- pfree(sql);
-
/* free previous query */
pfree(step->sql_statement);
/* get a copy of new query */
@@ -1742,6 +1988,81 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
/*
+ * Traverse the plan subtree and set cursor name for RemoteQuery nodes
+ * Cursor names must be unique, so append step_no parameter to the initial
+ * cursor name. Returns next step_no to be assigned
+ */
+static int
+set_cursor_name(Plan *subtree, char *cursor, int step_no)
+{
+ if (innerPlan(subtree))
+ step_no = set_cursor_name(innerPlan(subtree), cursor, step_no);
+ if (outerPlan(subtree))
+ step_no = set_cursor_name(outerPlan(subtree), cursor, step_no);
+ if (IsA(subtree, RemoteQuery))
+ {
+ RemoteQuery *step = (RemoteQuery *) subtree;
+ /*
+ * Keep the name for the very first step, hoping it is the only step and
+ * we do not have to modify WHERE CURRENT OF
+ */
+ if (step_no)
+ {
+ StringInfoData buf;
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "%s%d", cursor, step_no++);
+ /* make a copy before overwriting */
+ step->cursor = buf.data;
+ }
+ else
+ {
+ step_no++;
+ step->cursor = pstrdup(cursor);
+ }
+ }
+ return step_no;
+}
+
+/*
+ * Append ctid to the field list of step queries to support update
+ * WHERE CURRENT OF. The ctid is not sent down to client but used as a key
+ * to find target tuple
+ */
+static void
+fetch_ctid_of(Plan *subtree, RowMarkClause *rmc)
+{
+ /* recursively process subnodes */
+ if (innerPlan(subtree))
+ fetch_ctid_of(innerPlan(subtree), rmc);
+ if (outerPlan(subtree))
+ fetch_ctid_of(outerPlan(subtree), rmc);
+
+ /* we are only interested in RemoteQueries */
+ if (IsA(subtree, RemoteQuery))
+ {
+ RemoteQuery *step = (RemoteQuery *) subtree;
+ /*
+ * TODO Find if the table is referenced by the step query
+ */
+
+ char *from_sql = strpos(step->sql_statement, " FROM ");
+ if (from_sql)
+ {
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, step->sql_statement,
+ (int) (from_sql - step->sql_statement));
+ /* TODO qualify with the table name */
+ appendStringInfoString(&buf, ", ctid");
+ appendStringInfoString(&buf, from_sql);
+ pfree(step->sql_statement);
+ step->sql_statement = buf.data;
+ }
+ }
+}
+
+/*
* Plan to sort step tuples
* PGXC: copied and adopted from optimizer/plan/planner.c
*/
@@ -2114,44 +2435,9 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
query_step = makeNode(RemoteQuery);
query_step->is_single_step = false;
- /*
- * Declare Cursor case:
- * We should leave as a step query only SELECT statement
- * Further if we need refer source statement for planning we should take
- * the truncated string
- */
- if (query->utilityStmt &&
+ if (query->utilityStmt &&
IsA(query->utilityStmt, DeclareCursorStmt))
- {
-
- char *src = query->sql_statement;
- char str[strlen(src) + 1]; /* mutable copy */
- char *dst = str;
-
- cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options;
-
- /*
- * Initialize mutable copy, converting letters to uppercase and
- * various witespace characters to spaces
- */
- while (*src)
- {
- if (isspace(*src))
- {
- src++;
- *dst++ = ' ';
- }
- else
- *dst++ = toupper(*src++);
- }
- *dst = '\0';
- /* search for SELECT keyword in the normalized string */
- dst = strstr(str, " SELECT ");
- /* Take substring of the original string using found offset */
- query_step->sql_statement = pstrdup(query->sql_statement + (dst - str + 1));
- }
- else
- query_step->sql_statement = pstrdup(query->sql_statement);
+ cursorOptions |= ((DeclareCursorStmt *) query->utilityStmt)->options;
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
@@ -2187,7 +2473,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query->commandType != CMD_SELECT)
result->resultRelations = list_make1_int(query->resultRelation);
- query_step->exec_nodes = get_plan_nodes_command(query);
+ get_plan_nodes_command(query, query_step);
if (query_step->exec_nodes == NULL)
{
@@ -2217,26 +2503,29 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
}
/*
- * Use standard plan if we have more than one data node with either
- * group by, hasWindowFuncs, or hasRecursive
+ * get_plan_nodes_command may alter original statement, so do not
+ * process it before the call
+ *
+ * Declare Cursor case:
+ * We should leave as a step query only SELECT statement
+ * Further if we need refer source statement for planning we should take
+ * the truncated string
*/
- /*
- * PGXCTODO - this could be improved to check if the first
- * group by expression is the partitioning column, in which
- * case it is ok to treat as a single step.
- */
- if (query->commandType == CMD_SELECT
- && query_step->exec_nodes
- && list_length(query_step->exec_nodes->nodelist) > 1
- && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
+ if (query->utilityStmt &&
+ IsA(query->utilityStmt, DeclareCursorStmt))
{
- result = standard_planner(query, cursorOptions, boundParams);
- return result;
+
+ /* search for SELECT keyword in the normalized string */
+ char *select = strpos(query->sql_statement, " SELECT ");
+ /* Take substring of the original string using found offset */
+ query_step->sql_statement = pstrdup(select + 1);
}
+ else
+ query_step->sql_statement = pstrdup(query->sql_statement);
/*
- * If there already is an active portal, we may be doing planning
- * within a function. Just use the standard plan, but check if
+ * If there already is an active portal, we may be doing planning
+ * within a function. Just use the standard plan, but check if
* it is part of an EXPLAIN statement so that we do not show that
* we plan multiple steps when it is a single-step operation.
*/
@@ -2285,6 +2574,24 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result = standard_planner(query, cursorOptions, boundParams);
return result;
}
+
+ /*
+ * Use standard plan if we have more than one data node with either
+ * group by, hasWindowFuncs, or hasRecursive
+ */
+ /*
+ * PGXCTODO - this could be improved to check if the first
+ * group by expression is the partitioning column, in which
+ * case it is ok to treat as a single step.
+ */
+ if (query->commandType == CMD_SELECT
+ && query_step->exec_nodes
+ && list_length(query_step->exec_nodes->nodelist) > 1
+ && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
+ {
+ result->planTree = standardPlan;
+ return result;
+ }
break;
default:
@@ -2307,6 +2614,33 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
result->planTree = materialize_finished_plan(result->planTree);
}
+ /*
+ * Support for multi-step cursor.
+ * Ensure uniqueness of remote cursor name
+ */
+ if (query->utilityStmt &&
+ IsA(query->utilityStmt, DeclareCursorStmt))
+ {
+ DeclareCursorStmt *stmt = (DeclareCursorStmt *) query->utilityStmt;
+ set_cursor_name(result->planTree, stmt->portalname, 0);
+ }
+
+ /*
+ * If query is FOR UPDATE fetch CTIDs from the remote node
+ * Use CTID as a key to update tuples on remote nodes when handling
+ * WHERE CURRENT OF
+ */
+ if (query->rowMarks)
+ {
+ ListCell *lc;
+ foreach(lc, query->rowMarks)
+ {
+ RowMarkClause *rmc = (RowMarkClause *) lfirst(lc);
+
+ fetch_ctid_of(result->planTree, rmc);
+ }
+ }
+
return result;
}
@@ -2321,6 +2655,8 @@ free_query_step(RemoteQuery *query_step)
return;
pfree(query_step->sql_statement);
+ if (query_step->cursor)
+ pfree(query_step->cursor);
if (query_step->exec_nodes)
{
if (query_step->exec_nodes->nodelist)
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 7fe08beacb..a524c13f6d 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -35,6 +35,8 @@
#define END_QUERY_TIMEOUT 20
#define CLEAR_TIMEOUT 5
+#define DATA_NODE_FETCH_SIZE 1
+
extern char *deparseSql(RemoteQueryState *scanstate);
@@ -70,6 +72,8 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles);
static int handle_response_clear(PGXCNodeHandle * conn);
+static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor);
+
static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void);
#define MAX_STATEMENTS_PER_TRAN 10
@@ -200,8 +204,11 @@ CreateResponseCombiner(int node_count, CombineType combine_type)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->msg = NULL;
- combiner->msglen = 0;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = NIL;
+ combiner->tapenodes = NULL;
combiner->initAggregates = true;
combiner->simple_aggregates = NULL;
combiner->copy_file = NULL;
@@ -671,10 +678,10 @@ HandleCopyDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
* Caller must stop reading if function returns false
*/
static void
-HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
+HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len, int node)
{
/* We expect previous message is consumed */
- Assert(combiner->msg == NULL);
+ Assert(combiner->currentRow.msg == NULL);
if (combiner->request_type != REQUEST_TYPE_QUERY)
{
@@ -695,9 +702,10 @@ HandleDataRow(RemoteQueryState *combiner, char *msg_body, size_t len)
* We are copying message because it points into connection buffer, and
* will be overwritten on next socket read
*/
- combiner->msg = (char *) palloc(len);
- memcpy(combiner->msg, msg_body, len);
- combiner->msglen = len;
+ combiner->currentRow.msg = (char *) palloc(len);
+ memcpy(combiner->currentRow.msg, msg_body, len);
+ combiner->currentRow.msglen = len;
+ combiner->currentRow.msgnode = node;
}
/*
@@ -850,6 +858,10 @@ CloseCombiner(RemoteQueryState *combiner)
FreeTupleDesc(combiner->tuple_desc);
if (combiner->errorMessage)
pfree(combiner->errorMessage);
+ if (combiner->cursor_connections)
+ pfree(combiner->cursor_connections);
+ if (combiner->tapenodes)
+ pfree(combiner->tapenodes);
pfree(combiner);
}
}
@@ -874,15 +886,24 @@ static bool
ValidateAndResetCombiner(RemoteQueryState *combiner)
{
bool valid = validate_combiner(combiner);
+ ListCell *lc;
if (combiner->connections)
pfree(combiner->connections);
if (combiner->tuple_desc)
FreeTupleDesc(combiner->tuple_desc);
- if (combiner->msg)
- pfree(combiner->msg);
+ if (combiner->currentRow.msg)
+ pfree(combiner->currentRow.msg);
+ foreach(lc, combiner->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ pfree(dataRow->msg);
+ }
+ list_free_deep(combiner->rowBuffer);
if (combiner->errorMessage)
pfree(combiner->errorMessage);
+ if (combiner->tapenodes)
+ pfree(combiner->tapenodes);
combiner->command_complete_count = 0;
combiner->connections = NULL;
@@ -894,8 +915,11 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
combiner->copy_out_count = 0;
combiner->errorMessage = NULL;
combiner->query_Done = false;
- combiner->msg = NULL;
- combiner->msglen = 0;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = NIL;
+ combiner->tapenodes = NULL;
combiner->simple_aggregates = NULL;
combiner->copy_file = NULL;
@@ -903,22 +927,247 @@ ValidateAndResetCombiner(RemoteQueryState *combiner)
}
/*
+ * It is possible if multiple steps share the same data node connection, when
+ * executor is running multi-step query or client is running multiple queries
+ * using Extended Query Protocol. After returning next tuple ExecRemoteQuery
+ * function passes execution control to the executor and then it can be given
+ * to the same RemoteQuery or to different one. It is possible that before
+ * returning a tuple the function do not read all data node responses. In this
+ * case pending responses should be read in context of original RemoteQueryState
+ * till ReadyForQuery message and data rows should be stored (buffered) to be
+ * available when fetch from that RemoteQueryState is requested again.
+ * BufferConnection function does the job.
+ * If a RemoteQuery is going to use connection it should check connection state.
+ * DN_CONNECTION_STATE_QUERY indicates query has data to read and combiner
+ * points to the original RemoteQueryState. If combiner differs from "this" the
+ * connection should be buffered.
+ */
+void
+BufferConnection(PGXCNodeHandle *conn)
+{
+ RemoteQueryState *combiner = conn->combiner;
+ /*
+ * When BufferConnection is invoked CurrentContext is related to other
+ * portal, which is trying to control the connection.
+ * TODO See if we can find better context to switch to
+ */
+ MemoryContext oldcontext = MemoryContextSwitchTo(combiner->ss.ss_ScanTupleSlot->tts_mcxt);
+
+ Assert(conn->state == DN_CONNECTION_STATE_QUERY && combiner);
+
+ /* Verify the connection is in use by the combiner */
+ combiner->current_conn = 0;
+ while (combiner->current_conn < combiner->conn_count)
+ {
+ if (combiner->connections[combiner->current_conn] == conn)
+ break;
+ combiner->current_conn++;
+ }
+ Assert(combiner->current_conn < combiner->conn_count);
+
+ /*
+ * Buffer data rows until data node return number of rows specified by the
+ * fetch_size parameter of last Execute message (PortalSuspended message)
+ * or end of result set is reached (CommandComplete message)
+ */
+ while (conn->state == DN_CONNECTION_STATE_QUERY)
+ {
+ int res;
+
+ /* Move to buffer currentRow (received from the data node) */
+ if (combiner->currentRow.msg)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) palloc(sizeof(RemoteDataRowData));
+ *dataRow = combiner->currentRow;
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ combiner->rowBuffer = lappend(combiner->rowBuffer, dataRow);
+ }
+
+ res = handle_response(conn, combiner);
+ /*
+ * If response message is a DataRow it will be handled on the next
+ * iteration.
+ * PortalSuspended will cause connection state change and break the loop
+ * The same is for CommandComplete, but we need additional handling -
+ * remove connection from the list of active connections.
+ * We may need to add handling error response
+ */
+ if (res == RESPONSE_EOF)
+ {
+ /* incomplete message, read more */
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ continue;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ /*
+ * End of result set is reached, so either set the pointer to the
+ * connection to NULL (step with sort) or remove it from the list
+ * (step without sort)
+ */
+ if (combiner->tuplesortstate)
+ {
+ combiner->connections[combiner->current_conn] = NULL;
+ if (combiner->tapenodes == NULL)
+ combiner->tapenodes = (int*) palloc0(NumDataNodes * sizeof(int));
+ combiner->tapenodes[combiner->current_conn] = conn->nodenum;
+ }
+ else
+ /* Remove current connection, move last in-place, adjust current_conn */
+ if (combiner->current_conn < --combiner->conn_count)
+ combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count];
+ else
+ combiner->current_conn = 0;
+ }
+ /*
+ * Before output RESPONSE_COMPLETE or PORTAL_SUSPENDED handle_response()
+ * changes connection state to DN_CONNECTION_STATE_IDLE, breaking the
+ * loop. We do not need to do anything specific in case of
+ * PORTAL_SUSPENDED so skiping "else if" block for that case
+ */
+ }
+ MemoryContextSwitchTo(oldcontext);
+ conn->combiner = NULL;
+}
+
+/*
* Get next data row from the combiner's buffer into provided slot
- * Just clear slot and return false if buffer is empty, that means more data
- * should be read
+ * Just clear slot and return false if buffer is empty, that means end of result
+ * set is reached
*/
bool
FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
{
- /* have messages in the buffer, consume them */
- if (combiner->msg)
+ bool have_tuple = false;
+
+ while (combiner->conn_count > 0)
{
- ExecStoreDataRowTuple(combiner->msg, combiner->msglen, slot, true);
- combiner->msg = NULL;
- combiner->msglen = 0;
+ PGXCNodeHandle *conn;
+ int res;
+
+ /* If we have message in the buffer, consume it */
+ if (combiner->currentRow.msg)
+ {
+ ExecStoreDataRowTuple(combiner->currentRow.msg,
+ combiner->currentRow.msglen,
+ combiner->currentRow.msgnode, slot, true);
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
+ have_tuple = true;
+ }
+ /*
+ * If this is ordered fetch we can not know what is the node
+ * to handle next, so sorter will choose next itself and set it as
+ * currentRow to have it consumed on the next call to FetchTuple
+ */
+ if (((RemoteQuery *)combiner->ss.ps.plan)->sort)
+ return have_tuple;
+
+ /*
+ * Note: If we are fetching not sorted results we can not have both
+ * currentRow and buffered rows. When connection is buffered currentRow
+ * is moved to buffer, and then it is cleaned after buffering is
+ * completed. Afterwards rows will be taken from the buffer bypassing
+ * currentRow until buffer is empty, and only after that data are read
+ * from a connection.
+ */
+ if (list_length(combiner->rowBuffer) > 0)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) linitial(combiner->rowBuffer);
+ combiner->rowBuffer = list_delete_first(combiner->rowBuffer);
+ ExecStoreDataRowTuple(dataRow->msg, dataRow->msglen,
+ dataRow->msgnode, slot, true);
+ pfree(dataRow);
+ return true;
+ }
+
+ conn = combiner->connections[combiner->current_conn];
+
+ /* Going to use a connection, buffer it if needed */
+ if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != NULL
+ && conn->combiner != combiner)
+ BufferConnection(conn);
+
+ /*
+ * If current connection is idle it means portal on the data node is
+ * suspended. If we have a tuple do not hurry to request more rows,
+ * leave connection clean for other RemoteQueries.
+ * If we do not have, request more and try to get it
+ */
+ if (conn->state == DN_CONNECTION_STATE_IDLE)
+ {
+ /*
+ * If we have tuple to return do not hurry to request more, keep
+ * connection clean
+ */
+ if (have_tuple)
+ return have_tuple;
+ else
+ {
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ conn->combiner = combiner;
+ }
+ }
+
+ /* read messages */
+ res = handle_response(conn, combiner);
+ if (res == RESPONSE_EOF)
+ {
+ /* incomplete message, read more */
+ if (pgxc_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node")));
+ continue;
+ }
+ else if (res == RESPONSE_SUSPENDED)
+ {
+ /* Make next connection current */
+ if (++combiner->current_conn >= combiner->conn_count)
+ combiner->current_conn = 0;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ /* Remove current connection, move last in-place, adjust current_conn */
+ if (combiner->current_conn < --combiner->conn_count)
+ combiner->connections[combiner->current_conn] = combiner->connections[combiner->conn_count];
+ else
+ combiner->current_conn = 0;
+ }
+
+ /* If we have a tuple we can leave now. */
+ if (have_tuple)
+ return true;
+ }
+ /* Wrap up last message if exists */
+ if (combiner->currentRow.msg)
+ {
+ ExecStoreDataRowTuple(combiner->currentRow.msg,
+ combiner->currentRow.msglen,
+ combiner->currentRow.msgnode, slot, true);
+ combiner->currentRow.msg = NULL;
+ combiner->currentRow.msglen = 0;
+ combiner->currentRow.msgnode = 0;
return true;
}
- /* inform caller that buffer is empty */
+ /* otherwise report end of data to the caller */
ExecClearTuple(slot);
return false;
}
@@ -986,10 +1235,11 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections,
* Long term, we should look into cancelling executing statements
* and closing the connections.
* Return values:
- * EOF - need to receive more data for the connection
- * 0 - done with the connection
- * 1 - got data row
- * 2 - got copy response
+ * RESPONSE_EOF - need to receive more data for the connection
+ * RESPONSE_COMPLETE - done with the connection
+ * RESPONSE_TUPLEDESC - got tuple description
+ * RESPONSE_DATAROW - got data row
+ * RESPONSE_COPY - got copy response
*/
int
handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
@@ -997,38 +1247,40 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
char *msg;
int msg_len;
char msg_type;
+ bool suspended = false;
for (;;)
{
- /* No data available, exit */
- if (conn->state == DN_CONNECTION_STATE_QUERY)
- return RESPONSE_EOF;
+ Assert(conn->state != DN_CONNECTION_STATE_IDLE);
+ Assert(conn->combiner == combiner || conn->combiner == NULL);
/*
* If we are in the process of shutting down, we
- * may be rolling back, and the buffer may contain other messages.
- * We want to avoid a procarray exception
- * as well as an error stack overflow.
- */
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
if (proc_exit_inprogress)
- {
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+
+ /* don't read from from the connection if there is a fatal error */
+ if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
+ return RESPONSE_COMPLETE;
+
+ /* No data available, exit */
+ if (!HAS_MESSAGE_BUFFERED(conn))
return RESPONSE_EOF;
- }
/* TODO handle other possible responses */
msg_type = get_message(conn, &msg_len, &msg);
switch (msg_type)
{
case '\0': /* Not enough data in the buffer */
- conn->state = DN_CONNECTION_STATE_QUERY;
return RESPONSE_EOF;
case 'c': /* CopyToCommandComplete */
- conn->state = DN_CONNECTION_STATE_COMPLETED;
HandleCopyOutComplete(combiner);
break;
case 'C': /* CommandComplete */
- conn->state = DN_CONNECTION_STATE_COMPLETED;
HandleCommandComplete(combiner, msg, msg_len);
break;
case 'T': /* RowDescription */
@@ -1043,8 +1295,17 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
#ifdef DN_CONNECTION_DEBUG
Assert(conn->have_row_desc);
#endif
- HandleDataRow(combiner, msg, msg_len);
+ HandleDataRow(combiner, msg, msg_len, conn->nodenum);
return RESPONSE_DATAROW;
+ case 's': /* PortalSuspended */
+ suspended = true;
+ break;
+ case '1': /* ParseComplete */
+ case '2': /* BindComplete */
+ case '3': /* CloseComplete */
+ case 'n': /* NoData */
+ /* simple notifications, continue reading */
+ break;
case 'G': /* CopyInResponse */
conn->state = DN_CONNECTION_STATE_COPY_IN;
HandleCopyIn(combiner);
@@ -1060,7 +1321,6 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
break;
case 'E': /* ErrorResponse */
HandleError(combiner, msg, msg_len);
- conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
/*
* Do not return with an error, we still need to consume Z,
* ready-for-query
@@ -1074,12 +1334,22 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
*/
break;
case 'Z': /* ReadyForQuery */
+ {
+ /*
+ * Return result depends on previous connection state.
+ * If it was PORTAL_SUSPENDED coordinator want to send down
+ * another EXECUTE to fetch more rows, otherwise it is done
+ * with the connection
+ */
+ int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE;
conn->transaction_status = msg[0];
conn->state = DN_CONNECTION_STATE_IDLE;
+ conn->combiner = NULL;
#ifdef DN_CONNECTION_DEBUG
conn->have_row_desc = false;
#endif
- return RESPONSE_COMPLETE;
+ return result;
+ }
case 'I': /* EmptyQuery */
default:
/* sync lost? */
@@ -1092,6 +1362,7 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
return RESPONSE_EOF;
}
+
/*
* Like handle_response, but for consuming the messages,
* in case we of an error to clean the data node connection.
@@ -1138,8 +1409,8 @@ handle_response_clear(PGXCNodeHandle * conn)
case 'N': /* NoticeResponse */
break;
case 'E': /* ErrorResponse */
- conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
/*
+ * conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
* Do not return with an error, we still need to consume Z,
* ready-for-query
*/
@@ -1176,6 +1447,8 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
/* Send BEGIN */
for (i = 0; i < conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
return EOF;
@@ -1372,7 +1645,7 @@ finish:
rollback_xid = BeginTranGTM(NULL);
- /*
+ /*
* Send xid and rollback prepared down to Datanodes and Coordinators
* Even if we get an error on one, we try and send to the others
*/
@@ -1398,7 +1671,7 @@ finish:
/*
- * Commit prepared transaction on Datanodes and Coordinators (as necessary)
+ * Commit prepared transaction on Datanodes and Coordinators (as necessary)
* where it has been prepared.
* Connection to backends has been cut when transaction has been prepared,
* So it is necessary to send the COMMIT PREPARE message to all the nodes.
@@ -1831,18 +2104,6 @@ pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles)
int co_conn_count = pgxc_handles->co_conn_count;
int dn_conn_count = pgxc_handles->dn_conn_count;
- /*
- * Rollback is a special case, being issued because of an error.
- * We try to read and throw away any extra data on the connection before
- * issuing our rollbacks so that we did not read the results of the
- * previous command.
- */
- for (i = 0; i < dn_conn_count; i++)
- clear_socket_data(pgxc_handles->datanode_handles[i]);
-
- for (i = 0; i < co_conn_count; i++)
- clear_socket_data(pgxc_handles->coord_handles[i]);
-
/* Send ROLLBACK to all handles */
if (pgxc_all_handles_send_query(pgxc_handles, "ROLLBACK", false))
result = EOF;
@@ -1963,6 +2224,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
/* Send query to nodes */
for (i = 0; i < conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
@@ -2369,7 +2632,6 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
RemoteQueryState *remotestate;
Relation currentRelation;
-
remotestate = CreateResponseCombiner(0, node->combine_type);
remotestate->ss.ps.plan = (Plan *) node;
remotestate->ss.ps.state = estate;
@@ -2409,6 +2671,10 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
+
+ if (innerPlan(node))
+ innerPlanState(remotestate) = ExecInitNode(innerPlan(node), estate, eflags);
+
if (outerPlan(node))
outerPlanState(remotestate) = ExecInitNode(outerPlan(node), estate, eflags);
@@ -2425,7 +2691,9 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
if (src->tts_mcxt == dst->tts_mcxt)
{
/* now dst slot controls the backing message */
- ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen, dst, src->tts_shouldFreeRow);
+ ExecStoreDataRowTuple(src->tts_dataRow, src->tts_dataLen,
+ src->tts_dataNode, dst,
+ src->tts_shouldFreeRow);
src->tts_shouldFreeRow = false;
}
else
@@ -2433,10 +2701,11 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
/* have to make a copy */
MemoryContext oldcontext = MemoryContextSwitchTo(dst->tts_mcxt);
int len = src->tts_dataLen;
+ int node = src->tts_dataNode;
char *msg = (char *) palloc(len);
memcpy(msg, src->tts_dataRow, len);
- ExecStoreDataRowTuple(msg, len, dst, true);
+ ExecStoreDataRowTuple(msg, len, node, dst, true);
MemoryContextSwitchTo(oldcontext);
}
}
@@ -2610,20 +2879,21 @@ ExecRemoteQuery(RemoteQueryState *node)
int total_conn_count;
bool need_tran;
PGXCNodeAllHandles *pgxc_connections;
+ TupleTableSlot *innerSlot = NULL;
/*
- * If coordinator plan is specified execute it first.
- * If the plan is returning we are returning these tuples immediately.
- * If it is not returning or returned them all by current invocation
- * we will go ahead and execute remote query. Then we will never execute
- * the outer plan again because node->query_Done flag will be set and
- * execution won't get to that place.
+ * Inner plan for RemoteQuery supplies parameters.
+ * We execute inner plan to get a tuple and use values of the tuple as
+ * parameter values when executing this remote query.
+ * If returned slot contains NULL tuple break execution.
+ * TODO there is a problem how to handle the case if both inner and
+ * outer plans exist. We can decide later, since it is never used now.
*/
- if (outerPlanState(node))
+ if (innerPlanState(node))
{
- TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
- if (!TupIsNull(slot))
- return slot;
+ innerSlot = ExecProcNode(innerPlanState(node));
+// if (TupIsNull(innerSlot))
+// return innerSlot;
}
/*
@@ -2712,6 +2982,9 @@ ExecRemoteQuery(RemoteQueryState *node)
/* See if we have a primary node, execute on it first before the others */
if (primaryconnection)
{
+ if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(primaryconnection);
+
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
@@ -2751,7 +3024,6 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
-
Assert(node->combine_type == COMBINE_TYPE_SAME);
while (node->command_complete_count < 1)
@@ -2760,11 +3032,7 @@ ExecRemoteQuery(RemoteQueryState *node)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
- while (handle_response(primaryconnection, node) == RESPONSE_EOF)
- if (pgxc_node_receive(1, &primaryconnection, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
+ handle_response(primaryconnection, node);
if (node->errorMessage)
{
char *code = node->errorCode;
@@ -2777,6 +3045,8 @@ ExecRemoteQuery(RemoteQueryState *node)
for (i = 0; i < regular_conn_count; i++)
{
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
@@ -2811,106 +3081,174 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
+ if (step->cursor || innerSlot)
{
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
+ int fetch = 0;
+ /*
+ * execute and fetch rows only if they will be consumed
+ * immediately by the sorter
+ */
+ if (step->cursor)
+ fetch = 1;
+ /*
+ * Here is ad hoc handling of WHERE CURRENT OF clause.
+ * Previous step have returned the key values to update
+ * replicated tuple into innerSlot and the query is a
+ * DECLARE CURSOR. We do not want to execute this query now
+ * because of unusual fetch procedure: we just need to
+ * position cursor on the first row on each node.
+ * So here we just parse and bind query. As the result cursor
+ * specified by the statement will be created, and next step
+ * will issue MOVE command positioning the cursor properly.
+ * If we want to use parametrized requests for any other
+ * purpose we should return and rework this code
+ */
+ if (pgxc_node_send_query_extended(connections[i],
+ step->sql_statement,
+ NULL,
+ step->cursor,
+ innerSlot ? innerSlot->tts_dataLen : 0,
+ innerSlot ? innerSlot->tts_dataRow : NULL,
+ step->cursor != NULL,
+ fetch) != 0)
+ {
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
}
+ else
+ {
+ if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
+ {
+ pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+ connections[i]->combiner = node;
}
+ if (step->cursor)
+ {
+ node->cursor = step->cursor;
+ node->cursor_count = regular_conn_count;
+ node->cursor_connections = (PGXCNodeHandle **) palloc(regular_conn_count * sizeof(PGXCNodeHandle *));
+ memcpy(node->cursor_connections, connections, regular_conn_count * sizeof(PGXCNodeHandle *));
+ }
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ while (regular_conn_count > 0 && node->connections == NULL)
+ {
+ int i = 0;
+
+ if (pgxc_node_receive(regular_conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
/*
- * Stop if all commands are completed or we got a data row and
- * initialized state node for subsequent invocations
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
*/
- while (regular_conn_count > 0 && node->connections == NULL)
+ while (i < regular_conn_count)
{
- int i = 0;
-
- if (pgxc_node_receive(regular_conn_count, connections, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
- /*
- * Handle input from the data nodes.
- * If we got a RESPONSE_DATAROW we can break handling to wrap
- * it into a tuple and return. Handling will be continued upon
- * subsequent invocations.
- * If we got 0, we exclude connection from the list. We do not
- * expect more input from it. In case of non-SELECT query we quit
- * the loop when all nodes finish their work and send ReadyForQuery
- * with empty connections array.
- * If we got EOF, move to the next connection, will receive more
- * data on the next iteration.
- */
- while (i < regular_conn_count)
+ int res = handle_response(connections[i], node);
+ if (res == RESPONSE_EOF)
{
- int res = handle_response(connections[i], node);
- if (res == RESPONSE_EOF)
- {
- i++;
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (i < --regular_conn_count)
- connections[i] = connections[regular_conn_count];
- }
- else if (res == RESPONSE_TUPDESC)
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ connections[i] = connections[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ExecSetSlotDescriptor(scanslot, node->tuple_desc);
+ /*
+ * Now tuple table slot is responsible for freeing the
+ * descriptor
+ */
+ node->tuple_desc = NULL;
+ if (step->sort)
{
- ExecSetSlotDescriptor(scanslot, node->tuple_desc);
+ SimpleSort *sort = step->sort;
+
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
/*
- * Now tuple table slot is responsible for freeing the
- * descriptor
+ * First message is already in the buffer
+ * Further fetch will be under tuplesort control
+ * If query does not produce rows tuplesort will not
+ * be initialized
*/
- node->tuple_desc = NULL;
- if (step->sort)
- {
- SimpleSort *sort = step->sort;
-
- node->connections = connections;
- node->conn_count = regular_conn_count;
- /*
- * First message is already in the buffer
- * Further fetch will be under tuplesort control
- * If query does not produce rows tuplesort will not
- * be initialized
- */
- node->tuplesortstate = tuplesort_begin_merge(
- scanslot->tts_tupleDescriptor,
- sort->numCols,
- sort->sortColIdx,
- sort->sortOperators,
- sort->nullsFirst,
- node,
- work_mem);
- /*
- * Break the loop, do not wait for first row.
- * Tuplesort module want to control node it is
- * fetching rows from, while in this loop first
- * row would be got from random node
- */
- break;
- }
- }
- else if (res == RESPONSE_DATAROW)
- {
+ node->tuplesortstate = tuplesort_begin_merge(
+ scanslot->tts_tupleDescriptor,
+ sort->numCols,
+ sort->sortColIdx,
+ sort->sortOperators,
+ sort->nullsFirst,
+ node,
+ work_mem);
/*
- * Got first data row, quit the loop
+ * Break the loop, do not wait for first row.
+ * Tuplesort module want to control node it is
+ * fetching rows from, while in this loop first
+ * row would be got from random node
*/
- node->connections = connections;
- node->conn_count = regular_conn_count;
- node->current_conn = i;
break;
}
}
+ else if (res == RESPONSE_DATAROW)
+ {
+ /*
+ * Got first data row, quit the loop
+ */
+ node->connections = connections;
+ node->conn_count = regular_conn_count;
+ node->current_conn = i;
+ break;
+ }
}
+ }
+
+ if (node->cursor_count)
+ {
+ node->conn_count = node->cursor_count;
+ memcpy(connections, node->cursor_connections, node->cursor_count * sizeof(PGXCNodeHandle *));
+ node->connections = connections;
+ }
node->query_Done = true;
}
+ if (node->update_cursor)
+ {
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ close_node_cursors(all_dn_handles->datanode_handles,
+ all_dn_handles->dn_conn_count,
+ node->update_cursor);
+ pfree(node->update_cursor);
+ node->update_cursor = NULL;
+ }
+
if (node->tuplesortstate)
{
while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
@@ -2948,98 +3286,35 @@ ExecRemoteQuery(RemoteQueryState *node)
}
else
{
- while (node->conn_count > 0 && !have_tuple)
+ while (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
{
- int i;
-
- /*
- * If combiner already has tuple go ahead and return it
- * otherwise tuple will be cleared
- */
- if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ if (node->simple_aggregates)
{
- if (node->simple_aggregates)
- {
- if (node->simple_aggregates)
- {
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- have_tuple = true;
- }
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- have_tuple = true;
- }
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
}
-
- /*
- * Handle input to get next row or ensure command is completed,
- * starting from connection next after current. If connection
- * does not
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
-
- for (;;)
+ else
{
- int res = handle_response(node->connections[i], node);
- if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- if (pgxc_node_receive(node->conn_count, node->connections, NULL))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes")));
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_DATAROW)
- {
- node->current_conn = i;
- break;
- }
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ have_tuple = true;
+ break;
}
}
/*
* We may need to finalize aggregates
*/
- if (!have_tuple && node->simple_aggregates)
+ if (node->simple_aggregates)
{
finish_simple_aggregates(node, resultslot);
if (!TupIsNull(resultslot))
@@ -3058,7 +3333,31 @@ ExecRemoteQuery(RemoteQueryState *node)
errmsg("%s", node->errorMessage)));
}
- return resultslot;
+ /*
+ * While we are emitting rows we ignore outer plan
+ */
+ if (!TupIsNull(resultslot))
+ return resultslot;
+
+ /*
+ * If coordinator plan is specified execute it first.
+ * If the plan is returning we are returning these tuples immediately.
+ * If it is not returning or returned them all by current invocation
+ * we will go ahead and execute remote query. Then we will never execute
+ * the outer plan again because node->query_Done flag will be set and
+ * execution won't get to that place.
+ */
+ if (outerPlanState(node))
+ {
+ TupleTableSlot *slot = ExecProcNode(outerPlanState(node));
+ if (!TupIsNull(slot))
+ return slot;
+ }
+
+ /*
+ * OK, we have nothing to return, so return NULL
+ */
+ return NULL;
}
/*
@@ -3067,90 +3366,88 @@ ExecRemoteQuery(RemoteQueryState *node)
void
ExecEndRemoteQuery(RemoteQueryState *node)
{
+ ListCell *lc;
/*
- * If processing was interrupted, (ex: client did not consume all the data,
- * or a subquery with LIMIT) we may still have data on the nodes. Try and consume.
- * We do not simply call PGXCNodeConsumeMessages, because the same
- * connection could be used for multiple RemoteQuery steps.
- *
- * It seems most stable checking command_complete_count
- * and only then working with conn_count
- *
- * PGXCTODO: Change in the future when we remove materialization nodes.
+ * shut down the subplan
*/
- if (node->command_complete_count < node->node_count)
- {
- elog(WARNING, "Extra data node messages when ending remote query step");
-
- while (node->conn_count > 0)
- {
- int i = 0;
- int res;
-
- /*
- * Just consume the rest of the messages
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
+ if (innerPlanState(node))
+ ExecEndNode(innerPlanState(node));
- for (;;)
- {
- /* throw away message */
- if (node->msg)
- {
- pfree(node->msg);
- node->msg = NULL;
- }
+ /* clean up the buffer */
+ foreach(lc, node->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ pfree(dataRow->msg);
+ }
+ list_free_deep(node->rowBuffer);
- res = handle_response(node->connections[i], node);
+ node->current_conn = 0;
+ while (node->conn_count > 0)
+ {
+ int res;
+ PGXCNodeHandle *conn = node->connections[node->current_conn];
- if (res == RESPONSE_COMPLETE ||
- node->connections[i]->state == DN_CONNECTION_STATE_ERROR_FATAL ||
- node->connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
+ /* throw away message */
+ if (node->currentRow.msg)
+ {
+ pfree(node->currentRow.msg);
+ node->currentRow.msg = NULL;
+ }
+ /* no data is expected */
+ if (conn->state == DN_CONNECTION_STATE_IDLE ||
+ conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
+ {
+ if (node->current_conn < --node->conn_count)
+ node->connections[node->current_conn] = node->connections[node->conn_count];
+ continue;
+ }
+ res = handle_response(conn, node);
+ if (res == RESPONSE_EOF)
+ {
+ struct timeval timeout;
+ timeout.tv_sec = END_QUERY_TIMEOUT;
+ timeout.tv_usec = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- {
- struct timeval timeout;
- timeout.tv_sec = END_QUERY_TIMEOUT;
- timeout.tv_usec = 0;
-
- if (pgxc_node_receive(node->conn_count, node->connections, &timeout))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to read response from data nodes when ending query")));
- }
- }
- }
+ if (pgxc_node_receive(1, &conn, &timeout))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes when ending query")));
}
- elog(WARNING, "Data node connection buffers cleaned");
}
-
/*
* Release tuplesort resources
*/
if (node->tuplesortstate != NULL)
+ {
+ /*
+ * tuplesort_end invalidates minimal tuple if it is in the slot because
+ * deletes the TupleSort memory context, causing seg fault later when
+ * releasing tuple table
+ */
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+ }
node->tuplesortstate = NULL;
/*
+ * If there are active cursors close them
+ */
+ if (node->cursor)
+ close_node_cursors(node->cursor_connections, node->cursor_count, node->cursor);
+
+ if (node->update_cursor)
+ {
+ PGXCNodeAllHandles *all_dn_handles = get_exec_connections(NULL, EXEC_ON_DATANODES);
+ close_node_cursors(all_dn_handles->datanode_handles,
+ all_dn_handles->dn_conn_count,
+ node->update_cursor);
+ pfree(node->update_cursor);
+ node->update_cursor = NULL;
+ }
+
+ /*
* shut down the subplan
*/
if (outerPlanState(node))
@@ -3165,6 +3462,57 @@ ExecEndRemoteQuery(RemoteQueryState *node)
CloseCombiner(node);
}
+static void
+close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor)
+{
+ int i;
+ RemoteQueryState *combiner;
+
+ for (i = 0; i < conn_count; i++)
+ {
+ if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connections[i]);
+ if (pgxc_node_send_close(connections[i], false, cursor) != 0)
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ if (pgxc_node_send_sync(connections[i]) != 0)
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ }
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+
+ while (conn_count > 0)
+ {
+ if (pgxc_node_receive(conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to close data node cursor")));
+ i = 0;
+ while (i < conn_count)
+ {
+ int res = handle_response(connections[i], combiner);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--conn_count > i)
+ connections[i] = connections[conn_count];
+ }
+ else
+ {
+ // Unexpected response, ignore?
+ }
+ }
+ }
+
+ ValidateAndCloseCombiner(combiner);
+}
+
/*
* Consume any remaining messages on the connections.
* This is useful for calling after ereport()
@@ -3367,6 +3715,8 @@ ExecRemoteUtility(RemoteQuery *node)
/* See if we have a primary nodes, execute on it first before the others */
if (primaryconnection)
{
+ if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(primaryconnection);
/* If explicit transaction is needed gxid is already sent */
if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
@@ -3421,20 +3771,24 @@ ExecRemoteUtility(RemoteQuery *node)
{
for (i = 0; i < regular_conn_count; i++)
{
+ PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i];
+
+ if (conn->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(conn);
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(pgxc_connections->datanode_handles[i], gxid))
+ if (!need_tran && pgxc_node_send_gxid(conn, gxid))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && pgxc_node_send_snapshot(pgxc_connections->datanode_handles[i], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(conn, snapshot))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (pgxc_node_send_query(pgxc_connections->datanode_handles[i], node->sql_statement) != 0)
+ if (pgxc_node_send_query(conn, node->sql_statement) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -3498,7 +3852,8 @@ ExecRemoteUtility(RemoteQuery *node)
*/
while (i < regular_conn_count)
{
- int res = handle_response(pgxc_connections->datanode_handles[i], remotestate);
+ PGXCNodeHandle *conn = pgxc_connections->datanode_handles[i];
+ int res = handle_response(conn, remotestate);
if (res == RESPONSE_EOF)
{
i++;
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 0d902738f0..3bc3a83aa4 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -78,6 +78,7 @@ init_pgxc_handle(PGXCNodeHandle *pgxc_handle)
pgxc_handle->outBuffer = (char *) palloc(pgxc_handle->outSize);
pgxc_handle->inSize = 16 * 1024;
pgxc_handle->inBuffer = (char *) palloc(pgxc_handle->inSize);
+ pgxc_handle->combiner = NULL;
if (pgxc_handle->outBuffer == NULL || pgxc_handle->inBuffer == NULL)
{
@@ -239,6 +240,7 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum)
handle->sock = sock;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
+ handle->combiner = NULL;
#ifdef DN_CONNECTION_DEBUG
handle->have_row_desc = false;
#endif
@@ -268,7 +270,7 @@ pgxc_node_receive(const int conn_count,
{
/* If connection finised sending do not wait input from it */
if (connections[i]->state == DN_CONNECTION_STATE_IDLE
- || connections[i]->state == DN_CONNECTION_STATE_HAS_DATA)
+ || HAS_MESSAGE_BUFFERED(connections[i]))
continue;
/* prepare select params */
@@ -280,7 +282,7 @@ pgxc_node_receive(const int conn_count,
else
{
/* flag as bad, it will be removed from the list */
- connections[i]->state == DN_CONNECTION_STATE_ERROR_NOT_READY;
+ connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
}
}
@@ -327,11 +329,7 @@ retry:
add_error_message(conn, "unexpected EOF on datanode connection");
elog(WARNING, "unexpected EOF on datanode connection");
/* Should we read from the other connections before returning? */
- return EOF;
- }
- else
- {
- conn->state = DN_CONNECTION_STATE_HAS_DATA;
+ return EOF;
}
}
}
@@ -476,18 +474,6 @@ retry:
}
-/*
- * Clear out socket data and buffer.
- * Throw away any data.
- */
-void
-clear_socket_data (PGXCNodeHandle *conn)
-{
- do {
- conn->inStart = conn->inCursor = conn->inEnd = 0;
- } while (pgxc_node_read_data(conn) > 0);
-}
-
/*
* Get one character from the connection buffer and advance cursor
*/
@@ -576,7 +562,6 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg)
* ensure_in_buffer_capacity() will immediately return
*/
ensure_in_buffer_capacity(5 + (size_t) *len, conn);
- conn->state = DN_CONNECTION_STATE_QUERY;
conn->inCursor = conn->inStart;
return '\0';
}
@@ -589,7 +574,7 @@ get_message(PGXCNodeHandle *conn, int *len, char **msg)
/*
- * Release all data node connections and coordinator connections
+ * Release all data node connections and coordinator connections
* back to pool and release occupied memory
*
* If force_drop is true, we force dropping all of the connections, such as after
@@ -853,6 +838,327 @@ send_some(PGXCNodeHandle *handle, int len)
return result;
}
+/*
+ * Send PARSE message with specified statement down to the Data node
+ */
+int
+pgxc_node_send_parse(PGXCNodeHandle * handle, const char* statement,
+ const char *query)
+{
+ /* statement name size (allow NULL) */
+ int stmtLen = statement ? strlen(statement) + 1 : 1;
+ /* size of query string */
+ int strLen = strlen(query) + 1;
+ /* size of parameter array (always empty for now) */
+ int paramLen = 2;
+
+ /* size + stmtLen + strlen + paramLen */
+ int msgLen = 4 + stmtLen + strLen + paramLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'P';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement name */
+ if (statement)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen);
+ handle->outEnd += stmtLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* query */
+ memcpy(handle->outBuffer + handle->outEnd, query, strLen);
+ handle->outEnd += strLen;
+ /* parameter types (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+
+ return 0;
+}
+
+
+/*
+ * Send BIND message down to the Data node
+ */
+int
+pgxc_node_send_bind(PGXCNodeHandle * handle, const char *portal,
+ const char *statement, int paramlen, char *params)
+{
+ uint16 n16;
+ /* portal name size (allow NULL) */
+ int pnameLen = portal ? strlen(portal) + 1 : 1;
+ /* statement name size (allow NULL) */
+ int stmtLen = statement ? strlen(statement) + 1 : 1;
+ /* size of parameter codes array (always empty for now) */
+ int paramCodeLen = 2;
+ /* size of parameter values array, 2 if no params */
+ int paramValueLen = paramlen ? paramlen : 2;
+ /* size of output parameter codes array (always empty for now) */
+ int paramOutLen = 2;
+
+ /* size + pnameLen + stmtLen + parameters */
+ int msgLen = 4 + pnameLen + stmtLen + paramCodeLen + paramValueLen + paramOutLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'B';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* portal name */
+ if (portal)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen);
+ handle->outEnd += pnameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* statement name */
+ if (statement)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, statement, stmtLen);
+ handle->outEnd += stmtLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+ /* parameter codes (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+ /* parameter values */
+ if (paramlen)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, params, paramlen);
+ handle->outEnd += paramlen;
+ }
+ else
+ {
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+ }
+ /* output parameter codes (none) */
+ handle->outBuffer[handle->outEnd++] = 0;
+ handle->outBuffer[handle->outEnd++] = 0;
+
+ return 0;
+}
+
+
+/*
+ * Send DESCRIBE message (portal or statement) down to the Data node
+ */
+int
+pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement,
+ const char *name)
+{
+ /* statement or portal name size (allow NULL) */
+ int nameLen = name ? strlen(name) + 1 : 1;
+
+ /* size + statement/portal + name */
+ int msgLen = 4 + 1 + nameLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'D';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement/portal flag */
+ handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P';
+ /* object name */
+ if (name)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, name, nameLen);
+ handle->outEnd += nameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ return 0;
+}
+
+
+/*
+ * Send CLOSE message (portal or statement) down to the Data node
+ */
+int
+pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement,
+ const char *name)
+{
+ /* statement or portal name size (allow NULL) */
+ int nameLen = name ? strlen(name) + 1 : 1;
+
+ /* size + statement/portal + name */
+ int msgLen = 4 + 1 + nameLen;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'C';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* statement/portal flag */
+ handle->outBuffer[handle->outEnd++] = is_statement ? 'S' : 'P';
+ /* object name */
+ if (name)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, name, nameLen);
+ handle->outEnd += nameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ handle->state = DN_CONNECTION_STATE_QUERY;
+
+ return 0;
+}
+
+/*
+ * Send EXECUTE message down to the Data node
+ */
+int
+pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch)
+{
+ /* portal name size (allow NULL) */
+ int pnameLen = portal ? strlen(portal) + 1 : 1;
+
+ /* size + pnameLen + fetchLen */
+ int msgLen = 4 + pnameLen + 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'E';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+ /* portal name */
+ if (portal)
+ {
+ memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen);
+ handle->outEnd += pnameLen;
+ }
+ else
+ handle->outBuffer[handle->outEnd++] = '\0';
+
+ /* fetch */
+ fetch = htonl(fetch);
+ memcpy(handle->outBuffer + handle->outEnd, &fetch, 4);
+ handle->outEnd += 4;
+
+ handle->state = DN_CONNECTION_STATE_QUERY;
+
+ return 0;
+}
+
+
+/*
+ * Send FLUSH message down to the Data node
+ */
+int
+pgxc_node_send_flush(PGXCNodeHandle * handle)
+{
+ /* size */
+ int msgLen = 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'H';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+
+ return pgxc_node_flush(handle);
+}
+
+
+/*
+ * Send SYNC message down to the Data node
+ */
+int
+pgxc_node_send_sync(PGXCNodeHandle * handle)
+{
+ /* size */
+ int msgLen = 4;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'S';
+ /* size */
+ msgLen = htonl(msgLen);
+ memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
+ handle->outEnd += 4;
+
+ return pgxc_node_flush(handle);
+}
+
+
+/*
+ * Send the GXID down to the Data node
+ */
+int
+pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
+ const char *statement, const char *portal,
+ int paramlen, char *params,
+ bool send_describe, int fetch_size)
+{
+ if (pgxc_node_send_parse(handle, statement, query))
+ return EOF;
+ if (pgxc_node_send_bind(handle, portal, statement, paramlen, params))
+ return EOF;
+ if (send_describe)
+ if (pgxc_node_send_describe(handle, false, portal))
+ return EOF;
+ if (fetch_size >= 0)
+ if (pgxc_node_send_execute(handle, portal, fetch_size))
+ return EOF;
+ if (pgxc_node_send_sync(handle))
+ return EOF;
+
+ return 0;
+}
/*
* This method won't return until connection buffer is empty or error occurs
@@ -1034,7 +1340,6 @@ void
add_error_message(PGXCNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
- handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
if (handle->error)
{
/* PGXCTODO append */
@@ -1072,7 +1377,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ errmsg("out of memory")));
}
result->primary_handle = NULL;
@@ -1348,8 +1653,7 @@ is_active_connection(PGXCNodeHandle *handle)
{
return handle->sock != NO_SOCKET &&
handle->state != DN_CONNECTION_STATE_IDLE &&
- handle->state != DN_CONNECTION_STATE_ERROR_NOT_READY &&
- handle->state != DN_CONNECTION_STATE_ERROR_FATAL;
+ !DN_CONNECTION_STATE_ERROR(handle);
}
/*
@@ -1378,7 +1682,7 @@ get_active_nodes(PGXCNodeHandle **connections)
if (is_active_connection(&co_handles[i]))
connections[active_count++] = &co_handles[i];
}
- }
+ }
return active_count;
}
@@ -1442,6 +1746,11 @@ pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer
/* Send to Datanodes */
for (i = 0; i < dn_conn_count; i++)
{
+ /*
+ * Clean connection if fetch in progress
+ */
+ if (pgxc_handles->datanode_handles[i]->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(pgxc_handles->datanode_handles[i]);
if (pgxc_node_send_query(pgxc_handles->datanode_handles[i], buffer))
{
add_error_message(pgxc_handles->datanode_handles[i], "Can not send request");
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index dd72a4bfee..91451ce60a 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1523,7 +1523,7 @@ get_relname_relid(const char *relname, Oid relnamespace)
0, 0);
}
-#ifdef NOT_USED
+#ifdef PGXC
/*
* get_relnatts
*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index cb078be067..50ad300429 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2875,25 +2875,119 @@ reversedirection_heap(Tuplesortstate *state)
static unsigned int
getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
{
- PGXCNodeHandle *conn = state->combiner->connections[tapenum];
+ RemoteQueryState *combiner = state->combiner;
+ PGXCNodeHandle *conn = combiner->connections[tapenum];
+ /*
+ * If connection is active (potentially has data to read) we can get node
+ * number from the connection. If connection is not active (we have read all
+ * available data rows) and if we have buffered data from that connection
+ * the node number is stored in combiner->tapenodes[tapenum].
+ * If connection is inactive and no buffered data we have EOF condition
+ */
+ int nodenum = conn ? conn->nodenum : combiner->tapenodes[tapenum];
+ unsigned int len = 0;
+ ListCell *lc;
+ ListCell *prev = NULL;
+
+ /*
+ * If there are buffered rows iterate over them and get first from
+ * the requested tape
+ */
+ foreach (lc, combiner->rowBuffer)
+ {
+ RemoteDataRow dataRow = (RemoteDataRow) lfirst(lc);
+ if (dataRow->msgnode == nodenum)
+ {
+ combiner->currentRow = *dataRow;
+ combiner->rowBuffer = list_delete_cell(combiner->rowBuffer, lc, prev);
+ return dataRow->msglen;
+ }
+ prev = lc;
+ }
+
+ /* Nothing is found in the buffer, check for EOF */
+ if (conn == NULL)
+ {
+ if (eofOK)
+ return 0;
+ else
+ elog(ERROR, "unexpected end of data");
+ }
+
+ /* Going to get data from connection, buffer if needed */
+ if (conn->state == DN_CONNECTION_STATE_QUERY && conn->combiner != combiner)
+ BufferConnection(conn);
+
+ /* Request more rows if needed */
+ if (conn->state == DN_CONNECTION_STATE_IDLE)
+ {
+ Assert(combiner->cursor);
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ conn->state = DN_CONNECTION_STATE_QUERY;
+ conn->combiner = combiner;
+ }
+ /* Read data from the connection until get a row or EOF */
for (;;)
{
- switch (handle_response(conn, state->combiner))
+ switch (handle_response(conn, combiner))
{
+ case RESPONSE_SUSPENDED:
+ /* Send Execute to request next row */
+ Assert(combiner->cursor);
+ if (len)
+ return len;
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ if (pgxc_node_send_sync(conn) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to fetch from data node cursor")));
+ conn->state = DN_CONNECTION_STATE_QUERY;
+ conn->combiner = combiner;
+ /* fallthru */
case RESPONSE_EOF:
+ /* receive more data */
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg(conn->error)));
break;
case RESPONSE_COMPLETE:
+ /* EOF encountered, close the tape and report EOF */
+ if (combiner->cursor)
+ {
+ combiner->connections[tapenum] = NULL;
+ if (len)
+ return len;
+ }
if (eofOK)
return 0;
else
elog(ERROR, "unexpected end of data");
break;
case RESPONSE_DATAROW:
- return state->combiner->msglen;
+ Assert(len == 0);
+ if (state->combiner->cursor)
+ {
+ /*
+ * We fetching one row at a time when running EQP
+ * so read following PortalSuspended or ResponseComplete
+ * to leave connection clean between the calls
+ */
+ len = state->combiner->currentRow.msglen;
+ break;
+ }
+ else
+ return state->combiner->currentRow.msglen;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index c85b1b0c61..0c2c4fe0a2 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -124,6 +124,7 @@ typedef struct TupleTableSlot
*/
char *tts_dataRow; /* Tuple data in DataRow format */
int tts_dataLen; /* Actual length of the data row */
+ int tts_dataNode; /* Originating node of the data row */
bool tts_shouldFreeRow; /* should pfree tts_dataRow? */
struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */
#endif
@@ -177,6 +178,7 @@ extern TupleTableSlot *ExecStoreMinimalTuple(MinimalTuple mtup,
#ifdef PGXC
extern TupleTableSlot *ExecStoreDataRowTuple(char *msg,
size_t len,
+ int node,
TupleTableSlot *slot,
bool shouldFree);
#endif
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 4f4a0c9c53..4a33842bdf 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -16,8 +16,8 @@
#ifndef EXECREMOTE_H
#define EXECREMOTE_H
-#include "pgxcnode.h"
#include "locator.h"
+#include "pgxcnode.h"
#include "planner.h"
#include "access/tupdesc.h"
#include "executor/tuptable.h"
@@ -29,9 +29,10 @@
/* Outputs of handle_response() */
#define RESPONSE_EOF EOF
#define RESPONSE_COMPLETE 0
-#define RESPONSE_TUPDESC 1
-#define RESPONSE_DATAROW 2
-#define RESPONSE_COPY 3
+#define RESPONSE_SUSPENDED 1
+#define RESPONSE_TUPDESC 2
+#define RESPONSE_DATAROW 3
+#define RESPONSE_COPY 4
typedef enum
{
@@ -42,6 +43,18 @@ typedef enum
REQUEST_TYPE_COPY_OUT /* Copy Out response */
} RequestType;
+/*
+ * Represents a DataRow message received from a remote node.
+ * Contains originating node number and message body in DataRow format without
+ * message code and length. Length is separate field
+ */
+typedef struct RemoteDataRowData
+{
+ char *msg; /* last data row message */
+ int msglen; /* length of the data row message */
+ int msgnode; /* node number of the data row message */
+} RemoteDataRowData;
+typedef RemoteDataRowData *RemoteDataRow;
typedef struct RemoteQueryState
{
@@ -60,8 +73,18 @@ typedef struct RemoteQueryState
char errorCode[5]; /* error code to send back to client */
char *errorMessage; /* error message to send back to client */
bool query_Done; /* query has been sent down to data nodes */
- char *msg; /* last data row message */
- int msglen; /* length of the data row message */
+ RemoteDataRowData currentRow; /* next data ro to be wrapped into a tuple */
+ /* TODO use a tuplestore as a rowbuffer */
+ List *rowBuffer; /* buffer where rows are stored when connection
+ * should be cleaned for reuse by other RemoteQuery */
+ /*
+ * To handle special case - if there is a simple sort and sort connection
+ * is buffered. If EOF is reached on a connection it should be removed from
+ * the array, but we need to know node number of the connection to find
+ * messages in the buffer. So we store nodenum to that array if reach EOF
+ * when buffering
+ */
+ int *tapenodes;
/*
* While we are not supporting grouping use this flag to indicate we need
* to initialize collecting of aggregates from the DNs
@@ -74,6 +97,11 @@ typedef struct RemoteQueryState
MemoryContext tmp_ctx; /* separate context is needed to compare tuples */
FILE *copy_file; /* used if copy_dest == COPY_FILE */
uint64 processed; /* count of data rows when running CopyOut */
+ /* cursor support */
+ char *cursor; /* cursor name */
+ char *update_cursor; /* throw this cursor current tuple can be updated */
+ int cursor_count; /* total count of participating nodes */
+ PGXCNodeHandle **cursor_connections;/* data node connections being combined */
} RemoteQueryState;
/* Multinode Executor */
@@ -98,9 +126,9 @@ extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
+extern void BufferConnection(PGXCNodeHandle *conn);
extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
-extern void PGXCNodeConsumeMessages(void);
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index d6c5af950c..a57e4f1673 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -34,17 +34,18 @@ typedef enum
{
DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
- DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
- DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
- DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
DN_CONNECTION_STATE_COPY_IN,
DN_CONNECTION_STATE_COPY_OUT
} DNConnectionState;
#define DN_CONNECTION_STATE_ERROR(dnconn) \
- (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
+ ((dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
+ || (dnconn)->transaction_status == 'E')
+
+#define HAS_MESSAGE_BUFFERED(conn) \
+ ((conn)->inCursor + 4 < (conn)->inEnd \
+ && (conn)->inCursor + ntohl(*((uint32_t *) ((conn)->inBuffer + (conn)->inCursor + 1))) < (conn)->inEnd)
struct pgxc_node_handle
{
@@ -54,6 +55,7 @@ struct pgxc_node_handle
/* Connection state */
char transaction_status;
DNConnectionState state;
+ struct RemoteQueryState *combiner;
#ifdef DN_CONNECTION_DEBUG
bool have_row_desc;
#endif
@@ -103,6 +105,16 @@ extern int ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handl
extern int ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
extern int pgxc_node_send_query(PGXCNodeHandle * handle, const char *query);
+extern int pgxc_node_send_describe(PGXCNodeHandle * handle, bool is_statement,
+ const char *name);
+extern int pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch);
+extern int pgxc_node_send_close(PGXCNodeHandle * handle, bool is_statement,
+ const char *name);
+extern int pgxc_node_send_sync(PGXCNodeHandle * handle);
+extern int pgxc_node_send_query_extended(PGXCNodeHandle *handle, const char *query,
+ const char *statement, const char *portal,
+ int paramlen, char *params,
+ bool send_describe, int fetch_size);
extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid);
extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot);
extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp);
@@ -119,6 +131,5 @@ extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const c
extern char get_message(PGXCNodeHandle *conn, int *len, char **msg);
extern void add_error_message(PGXCNodeHandle * handle, const char *message);
-extern void clear_socket_data (PGXCNodeHandle *conn);
#endif
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index 8aae356fd2..7284ef7432 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -85,6 +85,7 @@ typedef struct
SimpleDistinct *distinct;
bool read_only; /* do not use 2PC when committing read only steps */
bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ char *cursor; /* if specified use it as a Portal name on data nodes */
RemoteQueryExecType exec_type;
char *relname;
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 854b3eac9a..aef70eb7d1 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -88,6 +88,9 @@ extern char func_volatile(Oid funcid);
extern float4 get_func_cost(Oid funcid);
extern float4 get_func_rows(Oid funcid);
extern Oid get_relname_relid(const char *relname, Oid relnamespace);
+#ifdef PGXC
+extern int get_relnatts(Oid relid);
+#endif
extern char *get_rel_name(Oid relid);
extern Oid get_rel_namespace(Oid relid);
extern Oid get_rel_type_id(Oid relid);