summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/nodes/copyfuncs.c4
-rw-r--r--src/backend/optimizer/plan/createplan.c8
-rw-r--r--src/backend/parser/analyze.c2
-rw-r--r--src/backend/pgxc/plan/planner.c12
-rw-r--r--src/backend/pgxc/pool/execRemote.c310
-rw-r--r--src/include/pgxc/locator.h4
6 files changed, 119 insertions, 221 deletions
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 9f07766e1c..83fac1ce93 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1035,8 +1035,8 @@ _copyExecNodes(ExecNodes *from)
COPY_NODE_FIELD(nodelist);
COPY_SCALAR_FIELD(baselocatortype);
COPY_SCALAR_FIELD(tableusagetype);
- COPY_NODE_FIELD(expr);
- COPY_SCALAR_FIELD(relid);
+ COPY_NODE_FIELD(en_expr);
+ COPY_SCALAR_FIELD(en_relid);
COPY_SCALAR_FIELD(accesstype);
return newnode;
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 699a476f99..d3b5c7793b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -5560,12 +5560,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan)
xstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
xstep->exec_nodes->primarynodelist = NULL;
xstep->exec_nodes->nodelist = NULL;
- xstep->exec_nodes->relid = ttab->relid;
+ xstep->exec_nodes->en_relid = ttab->relid;
xstep->exec_nodes->accesstype = RELATION_ACCESS_READ;
/* First and only target entry of topplan is ctid, reference it */
ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0);
- xstep->exec_nodes->expr = (Expr *) ctid;
+ xstep->exec_nodes->en_expr = (Expr *) ctid;
pfree(xbuf->data);
pfree(xbuf);
@@ -5590,12 +5590,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan)
fstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
fstep->exec_nodes->primarynodelist = NULL;
fstep->exec_nodes->nodelist = NULL;
- fstep->exec_nodes->relid = ttab->relid;
+ fstep->exec_nodes->en_relid = ttab->relid;
fstep->exec_nodes->accesstype = RELATION_ACCESS_UPDATE;
/* First and only target entry of topplan is ctid, reference it */
ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0);
- fstep->exec_nodes->expr = (Expr *) ctid;
+ fstep->exec_nodes->en_expr = (Expr *) ctid;
}
pfree(buf->data);
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be8d2613c8..1483187f55 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -2382,7 +2382,7 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt)
step->exec_nodes = (ExecNodes *) palloc(sizeof(ExecNodes));
step->exec_nodes->primarynodelist = NIL;
step->exec_nodes->nodelist = NIL;
- step->exec_nodes->expr = NIL;
+ step->exec_nodes->en_expr = NIL;
step->force_autocommit = false;
step->combine_type = COMBINE_TYPE_SAME;
step->read_only = true;
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 4753088a90..54307387e4 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -643,8 +643,8 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER;
step->exec_nodes->primarynodelist = NULL;
step->exec_nodes->nodelist = NULL;
- step->exec_nodes->expr = eval_expr;
- step->exec_nodes->relid = rel_loc_info->relid;
+ step->exec_nodes->en_expr = eval_expr;
+ step->exec_nodes->en_relid = rel_loc_info->relid;
step->exec_nodes->accesstype = RELATION_ACCESS_INSERT;
return;
}
@@ -1742,8 +1742,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
TABLE_USAGE_TYPE_USER;
context->query_step->exec_nodes->primarynodelist = NULL;
context->query_step->exec_nodes->nodelist = NULL;
- context->query_step->exec_nodes->expr = expr_comp->expr;
- context->query_step->exec_nodes->relid = expr_comp->relid;
+ context->query_step->exec_nodes->en_expr = expr_comp->expr;
+ context->query_step->exec_nodes->en_relid = expr_comp->relid;
context->query_step->exec_nodes->accesstype = context->accessType;
break;
}
@@ -1760,8 +1760,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context)
context->query_step->exec_nodes->primarynodelist = NULL;
context->query_step->exec_nodes->nodelist =
list_copy(rel_loc_info->nodeList);
- context->query_step->exec_nodes->expr = NULL;
- context->query_step->exec_nodes->relid = NULL;
+ context->query_step->exec_nodes->en_expr = NULL;
+ context->query_step->exec_nodes->en_relid = NULL;
context->query_step->exec_nodes->accesstype = context->accessType;
}
}
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 2f77f5e014..6226cebbbd 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -86,6 +86,10 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles);
static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor);
static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested);
+static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection,
+ bool need_tran, GlobalTransactionId gxid, TimestampTz timestamp,
+ RemoteQuery *step, int total_conn_count, Snapshot snapshot);
+static bool ExecRemoteQueryInnerPlan(RemoteQueryState *node);
#define MAX_STATEMENTS_PER_TRAN 10
@@ -2713,7 +2717,6 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node,
CombineType combine_type)
{
int i;
- int nLen = htonl(4);
RemoteQueryState *combiner = NULL;
bool need_tran;
bool error = false;
@@ -2854,9 +2857,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
&node->paramval_data);
/* We need expression context to evaluate */
- if (node->exec_nodes && node->exec_nodes->expr)
+ if (node->exec_nodes && node->exec_nodes->en_expr)
{
- Expr *expr = node->exec_nodes->expr;
+ Expr *expr = node->exec_nodes->en_expr;
if (IsA(expr, Var) && ((Var *) expr)->vartype == TIDOID)
{
@@ -2969,7 +2972,7 @@ get_exec_connections(RemoteQueryState *planstate,
if (exec_nodes)
{
- if (exec_nodes->expr)
+ if (exec_nodes->en_expr)
{
/*
* Special case (argh, another one): if expression data type is TID
@@ -2978,9 +2981,9 @@ get_exec_connections(RemoteQueryState *planstate,
* So try and determine originating node and execute command on
* that node only
*/
- if (IsA(exec_nodes->expr, Var) && ((Var *) exec_nodes->expr)->vartype == TIDOID)
+ if (IsA(exec_nodes->en_expr, Var) && ((Var *) exec_nodes->en_expr)->vartype == TIDOID)
{
- Var *ctid = (Var *) exec_nodes->expr;
+ Var *ctid = (Var *) exec_nodes->en_expr;
PlanState *source = (PlanState *) planstate;
TupleTableSlot *slot;
@@ -3015,7 +3018,7 @@ get_exec_connections(RemoteQueryState *planstate,
{
/* execution time determining of target data nodes */
bool isnull;
- ExprState *estate = ExecInitExpr(exec_nodes->expr,
+ ExprState *estate = ExecInitExpr(exec_nodes->en_expr,
(PlanState *) planstate);
Datum partvalue = ExecEvalExpr(estate,
planstate->ss.ps.ps_ExprContext,
@@ -3023,7 +3026,7 @@ get_exec_connections(RemoteQueryState *planstate,
NULL);
if (!isnull)
{
- RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid);
+ RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->en_relid);
/* PGXCTODO what is the type of partvalue here*/
ExecNodes *nodes = GetRelationNodes(rel_loc_info, partvalue, UNKNOWNOID, exec_nodes->accesstype);
if (nodes)
@@ -3143,6 +3146,56 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections)
}
}
+static bool
+pgxc_start_command_on_connection(PGXCNodeHandle *connection, bool need_tran,
+ GlobalTransactionId gxid, TimestampTz timestamp,
+ RemoteQuery *step, int total_conn_count,
+ Snapshot snapshot)
+{
+ if (connection->state == DN_CONNECTION_STATE_QUERY)
+ BufferConnection(connection);
+
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && pgxc_node_send_gxid(connection, gxid))
+ return false;
+ if (total_conn_count == 1 && pgxc_node_send_timestamp(connection, timestamp))
+ return false;
+ if (snapshot && pgxc_node_send_snapshot(connection, snapshot))
+ return false;
+ if (step->statement || step->cursor || step->paramval_data)
+ {
+ /* need to use Extended Query Protocol */
+ int fetch = 0;
+ bool prepared = false;
+
+ /* if prepared statement is referenced see if it is already exist */
+ if (step->statement)
+ prepared = ActivateDatanodeStatementOnNode(step->statement,
+ connection->nodenum);
+ /*
+ * execute and fetch rows only if they will be consumed
+ * immediately by the sorter
+ */
+ if (step->cursor)
+ fetch = 1;
+
+ if (pgxc_node_send_query_extended(connection,
+ prepared ? NULL : step->sql_statement,
+ step->statement,
+ step->cursor,
+ step->paramval_len,
+ step->paramval_data,
+ step->read_only,
+ fetch) != 0)
+ return false;
+ }
+ else
+ {
+ if (pgxc_node_send_query(connection, step->sql_statement) != 0)
+ return false;
+ }
+ return true;
+}
static void
do_query(RemoteQueryState *node)
@@ -3186,9 +3239,7 @@ do_query(RemoteQueryState *node)
* Primary connection is counted separately but is included in total_conn_count if used.
*/
if (primaryconnection)
- {
regular_conn_count--;
- }
pfree(pgxc_connections);
@@ -3257,33 +3308,8 @@ do_query(RemoteQueryState *node)
/* See if we have a primary node, execute on it first before the others */
if (primaryconnection)
{
- if (primaryconnection->state == DN_CONNECTION_STATE_QUERY)
- BufferConnection(primaryconnection);
-
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
- {
- pfree(connections);
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp))
- {
- /*
- * If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with pgxc_node_begin.
- * An autocommit transaction needs the global timestamp also,
- * so handle this case here.
- */
- pfree(connections);
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot))
+ if (!pgxc_start_command_on_connection(primaryconnection, need_tran, gxid,
+ timestamp, step, total_conn_count, snapshot))
{
pfree(connections);
pfree(primaryconnection);
@@ -3291,51 +3317,6 @@ do_query(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (step->statement || step->cursor || step->paramval_data)
- {
- /* need to use Extended Query Protocol */
- int fetch = 0;
- bool prepared = false;
-
- /* if prepared statement is referenced see if it is already exist */
- if (step->statement)
- prepared = ActivateDatanodeStatementOnNode(step->statement,
- primaryconnection->nodenum);
- /*
- * execute and fetch rows only if they will be consumed
- * immediately by the sorter
- */
- if (step->cursor)
- fetch = 1;
-
- if (pgxc_node_send_query_extended(primaryconnection,
- prepared ? NULL : step->sql_statement,
- step->statement,
- step->cursor,
- step->paramval_len,
- step->paramval_data,
- step->read_only,
- fetch) != 0)
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- }
- else
- {
- if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0)
- {
- pfree(connections);
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- }
Assert(node->combine_type == COMBINE_TYPE_SAME);
/* Make sure the command is completed on the primary node */
@@ -3371,26 +3352,9 @@ do_query(RemoteQueryState *node)
for (i = 0; i < regular_conn_count; i++)
{
- if (connections[i]->state == DN_CONNECTION_STATE_QUERY)
- BufferConnection(connections[i]);
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp))
+ if (!pgxc_start_command_on_connection(connections[i], need_tran, gxid,
+ timestamp, step, total_conn_count, snapshot))
{
- /*
- * If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with pgxc_node_begin.
- * An autocommit transaction needs the global timestamp also,
- * so handle this case here.
- */
pfree(connections);
if (primaryconnection)
pfree(primaryconnection);
@@ -3398,61 +3362,6 @@ do_query(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot))
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (step->statement || step->cursor || step->paramval_data)
- {
- /* need to use Extended Query Protocol */
- int fetch = 0;
- bool prepared = false;
-
- /* if prepared statement is referenced see if it is already exist */
- if (step->statement)
- prepared = ActivateDatanodeStatementOnNode(step->statement,
- connections[i]->nodenum);
- /*
- * execute and fetch rows only if they will be consumed
- * immediately by the sorter
- */
- if (step->cursor)
- fetch = 1;
-
- if (pgxc_node_send_query_extended(connections[i],
- prepared ? NULL : step->sql_statement,
- step->statement,
- step->cursor,
- step->paramval_len,
- step->paramval_data,
- step->read_only,
- fetch) != 0)
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- }
- else
- {
- if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
- {
- pfree(connections);
- if (primaryconnection)
- pfree(primaryconnection);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- }
connections[i]->combiner = node;
}
@@ -3566,6 +3475,43 @@ do_query(RemoteQueryState *node)
}
}
+/*
+ * ExecRemoteQueryInnerPlan
+ * Executes the inner plan of a RemoteQuery. It returns false if the inner plan
+ * does not return any row, otherwise it returns true.
+ */
+static bool
+ExecRemoteQueryInnerPlan(RemoteQueryState *node)
+{
+ RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
+ TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node));
+ /*
+ * Use data row returned by the previus step as a parameters for
+ * the main query.
+ */
+ if (!TupIsNull(innerSlot))
+ {
+ step->paramval_len = ExecCopySlotDatarow(innerSlot,
+ &step->paramval_data);
+
+ /* Needed for expression evaluation */
+ if (estate->es_param_exec_vals)
+ {
+ int i;
+ int natts = innerSlot->tts_tupleDescriptor->natts;
+
+ slot_getallattrs(innerSlot);
+ for (i = 0; i < natts; i++)
+ estate->es_param_exec_vals[i].value = slot_getattr(
+ innerSlot,
+ i+1,
+ &estate->es_param_exec_vals[i].isnull);
+ }
+ return true;
+ }
+ return false;
+}
/*
* Execute step of PGXC plan.
@@ -3581,7 +3527,6 @@ TupleTableSlot *
ExecRemoteQuery(RemoteQueryState *node)
{
RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
- EState *estate = node->ss.ps.state;
TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
bool have_tuple = false;
@@ -3600,31 +3545,7 @@ ExecRemoteQuery(RemoteQueryState *node)
*/
if (innerPlanState(node))
{
- TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node));
- /*
- * Use data row returned by the previus step as a parameters for
- * the main query.
- */
- if (!TupIsNull(innerSlot))
- {
- step->paramval_len = ExecCopySlotDatarow(innerSlot,
- &step->paramval_data);
-
- /* Needed for expression evaluation */
- if (estate->es_param_exec_vals)
- {
- int i;
- int natts = innerSlot->tts_tupleDescriptor->natts;
-
- slot_getallattrs(innerSlot);
- for (i = 0; i < natts; i++)
- estate->es_param_exec_vals[i].value = slot_getattr(
- innerSlot,
- i+1,
- &estate->es_param_exec_vals[i].isnull);
- }
- }
- else
+ if (!ExecRemoteQueryInnerPlan(node))
{
/* no parameters, exit */
return NULL;
@@ -3741,31 +3662,8 @@ handle_results:
*/
if (innerPlanState(node))
{
- TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node));
- if (!TupIsNull(innerSlot))
+ if (ExecRemoteQueryInnerPlan(node))
{
- /* reset the counter */
- node->command_complete_count = 0;
- /*
- * Use data row returned by the previus step as a parameters for
- * the main query.
- */
- step->paramval_len = ExecCopySlotDatarow(innerSlot,
- &step->paramval_data);
-
- /* Needed for expression evaluation */
- if (estate->es_param_exec_vals)
- {
- int i;
- int natts = innerSlot->tts_tupleDescriptor->natts;
-
- slot_getallattrs(innerSlot);
- for (i = 0; i < natts; i++)
- estate->es_param_exec_vals[i].value = slot_getattr(
- innerSlot,
- i+1,
- &estate->es_param_exec_vals[i].isnull);
- }
do_query(node);
goto handle_results;
}
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 9ee983c8cf..0c584da1f0 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -84,9 +84,9 @@ typedef struct
List *nodelist;
char baselocatortype;
TableUsageType tableusagetype; /* track pg_catalog usage */
- Expr *expr; /* expression to evaluate at execution time if planner
+ Expr *en_expr; /* expression to evaluate at execution time if planner
* can not determine execution nodes */
- Oid relid; /* Relation to determine execution nodes */
+ Oid en_relid; /* Relation to determine execution nodes */
RelationAccessType accesstype; /* Access type to determine execution nodes */
} ExecNodes;