diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 4 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 8 | ||||
| -rw-r--r-- | src/backend/parser/analyze.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 12 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 310 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 4 |
6 files changed, 119 insertions, 221 deletions
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 9f07766e1c..83fac1ce93 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1035,8 +1035,8 @@ _copyExecNodes(ExecNodes *from) COPY_NODE_FIELD(nodelist); COPY_SCALAR_FIELD(baselocatortype); COPY_SCALAR_FIELD(tableusagetype); - COPY_NODE_FIELD(expr); - COPY_SCALAR_FIELD(relid); + COPY_NODE_FIELD(en_expr); + COPY_SCALAR_FIELD(en_relid); COPY_SCALAR_FIELD(accesstype); return newnode; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 699a476f99..d3b5c7793b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -5560,12 +5560,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan) xstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; xstep->exec_nodes->primarynodelist = NULL; xstep->exec_nodes->nodelist = NULL; - xstep->exec_nodes->relid = ttab->relid; + xstep->exec_nodes->en_relid = ttab->relid; xstep->exec_nodes->accesstype = RELATION_ACCESS_READ; /* First and only target entry of topplan is ctid, reference it */ ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0); - xstep->exec_nodes->expr = (Expr *) ctid; + xstep->exec_nodes->en_expr = (Expr *) ctid; pfree(xbuf->data); pfree(xbuf); @@ -5590,12 +5590,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan) fstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; fstep->exec_nodes->primarynodelist = NULL; fstep->exec_nodes->nodelist = NULL; - fstep->exec_nodes->relid = ttab->relid; + fstep->exec_nodes->en_relid = ttab->relid; fstep->exec_nodes->accesstype = RELATION_ACCESS_UPDATE; /* First and only target entry of topplan is ctid, reference it */ ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0); - fstep->exec_nodes->expr = (Expr *) ctid; + fstep->exec_nodes->en_expr = (Expr *) ctid; } pfree(buf->data); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index be8d2613c8..1483187f55 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -2382,7 +2382,7 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt) step->exec_nodes = (ExecNodes *) palloc(sizeof(ExecNodes)); step->exec_nodes->primarynodelist = NIL; step->exec_nodes->nodelist = NIL; - step->exec_nodes->expr = NIL; + step->exec_nodes->en_expr = NIL; step->force_autocommit = false; step->combine_type = COMBINE_TYPE_SAME; step->read_only = true; diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 4753088a90..54307387e4 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -643,8 +643,8 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; step->exec_nodes->primarynodelist = NULL; step->exec_nodes->nodelist = NULL; - step->exec_nodes->expr = eval_expr; - step->exec_nodes->relid = rel_loc_info->relid; + step->exec_nodes->en_expr = eval_expr; + step->exec_nodes->en_relid = rel_loc_info->relid; step->exec_nodes->accesstype = RELATION_ACCESS_INSERT; return; } @@ -1742,8 +1742,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) TABLE_USAGE_TYPE_USER; context->query_step->exec_nodes->primarynodelist = NULL; context->query_step->exec_nodes->nodelist = NULL; - context->query_step->exec_nodes->expr = expr_comp->expr; - context->query_step->exec_nodes->relid = expr_comp->relid; + context->query_step->exec_nodes->en_expr = expr_comp->expr; + context->query_step->exec_nodes->en_relid = expr_comp->relid; context->query_step->exec_nodes->accesstype = context->accessType; break; } @@ -1760,8 +1760,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) context->query_step->exec_nodes->primarynodelist = NULL; context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); - context->query_step->exec_nodes->expr = NULL; - context->query_step->exec_nodes->relid = NULL; + context->query_step->exec_nodes->en_expr = NULL; + context->query_step->exec_nodes->en_relid = NULL; context->query_step->exec_nodes->accesstype = context->accessType; } } diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 2f77f5e014..6226cebbbd 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -86,6 +86,10 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles); static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested); +static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection, + bool need_tran, GlobalTransactionId gxid, TimestampTz timestamp, + RemoteQuery *step, int total_conn_count, Snapshot snapshot); +static bool ExecRemoteQueryInnerPlan(RemoteQueryState *node); #define MAX_STATEMENTS_PER_TRAN 10 @@ -2713,7 +2717,6 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type) { int i; - int nLen = htonl(4); RemoteQueryState *combiner = NULL; bool need_tran; bool error = false; @@ -2854,9 +2857,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) &node->paramval_data); /* We need expression context to evaluate */ - if (node->exec_nodes && node->exec_nodes->expr) + if (node->exec_nodes && node->exec_nodes->en_expr) { - Expr *expr = node->exec_nodes->expr; + Expr *expr = node->exec_nodes->en_expr; if (IsA(expr, Var) && ((Var *) expr)->vartype == TIDOID) { @@ -2969,7 +2972,7 @@ get_exec_connections(RemoteQueryState *planstate, if (exec_nodes) { - if (exec_nodes->expr) + if (exec_nodes->en_expr) { /* * Special case (argh, another one): if expression data type is TID @@ -2978,9 +2981,9 @@ get_exec_connections(RemoteQueryState *planstate, * So try and determine originating node and execute command on * that node only */ - if (IsA(exec_nodes->expr, Var) && ((Var *) exec_nodes->expr)->vartype == TIDOID) + if (IsA(exec_nodes->en_expr, Var) && ((Var *) exec_nodes->en_expr)->vartype == TIDOID) { - Var *ctid = (Var *) exec_nodes->expr; + Var *ctid = (Var *) exec_nodes->en_expr; PlanState *source = (PlanState *) planstate; TupleTableSlot *slot; @@ -3015,7 +3018,7 @@ get_exec_connections(RemoteQueryState *planstate, { /* execution time determining of target data nodes */ bool isnull; - ExprState *estate = ExecInitExpr(exec_nodes->expr, + ExprState *estate = ExecInitExpr(exec_nodes->en_expr, (PlanState *) planstate); Datum partvalue = ExecEvalExpr(estate, planstate->ss.ps.ps_ExprContext, @@ -3023,7 +3026,7 @@ get_exec_connections(RemoteQueryState *planstate, NULL); if (!isnull) { - RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid); + RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->en_relid); /* PGXCTODO what is the type of partvalue here*/ ExecNodes *nodes = GetRelationNodes(rel_loc_info, partvalue, UNKNOWNOID, exec_nodes->accesstype); if (nodes) @@ -3143,6 +3146,56 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections) } } +static bool +pgxc_start_command_on_connection(PGXCNodeHandle *connection, bool need_tran, + GlobalTransactionId gxid, TimestampTz timestamp, + RemoteQuery *step, int total_conn_count, + Snapshot snapshot) +{ + if (connection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connection); + + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(connection, gxid)) + return false; + if (total_conn_count == 1 && pgxc_node_send_timestamp(connection, timestamp)) + return false; + if (snapshot && pgxc_node_send_snapshot(connection, snapshot)) + return false; + if (step->statement || step->cursor || step->paramval_data) + { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + connection->nodenum); + /* + * execute and fetch rows only if they will be consumed + * immediately by the sorter + */ + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(connection, + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) + return false; + } + else + { + if (pgxc_node_send_query(connection, step->sql_statement) != 0) + return false; + } + return true; +} static void do_query(RemoteQueryState *node) @@ -3186,9 +3239,7 @@ do_query(RemoteQueryState *node) * Primary connection is counted separately but is included in total_conn_count if used. */ if (primaryconnection) - { regular_conn_count--; - } pfree(pgxc_connections); @@ -3257,33 +3308,8 @@ do_query(RemoteQueryState *node) /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { - if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(primaryconnection); - - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) - { - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) - { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) + if (!pgxc_start_command_on_connection(primaryconnection, need_tran, gxid, + timestamp, step, total_conn_count, snapshot)) { pfree(connections); pfree(primaryconnection); @@ -3291,51 +3317,6 @@ do_query(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (step->statement || step->cursor || step->paramval_data) - { - /* need to use Extended Query Protocol */ - int fetch = 0; - bool prepared = false; - - /* if prepared statement is referenced see if it is already exist */ - if (step->statement) - prepared = ActivateDatanodeStatementOnNode(step->statement, - primaryconnection->nodenum); - /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter - */ - if (step->cursor) - fetch = 1; - - if (pgxc_node_send_query_extended(primaryconnection, - prepared ? NULL : step->sql_statement, - step->statement, - step->cursor, - step->paramval_len, - step->paramval_data, - step->read_only, - fetch) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } - else - { - if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) - { - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } Assert(node->combine_type == COMBINE_TYPE_SAME); /* Make sure the command is completed on the primary node */ @@ -3371,26 +3352,9 @@ do_query(RemoteQueryState *node) for (i = 0; i < regular_conn_count; i++) { - if (connections[i]->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(connections[i]); - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + if (!pgxc_start_command_on_connection(connections[i], need_tran, gxid, + timestamp, step, total_conn_count, snapshot)) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); if (primaryconnection) pfree(primaryconnection); @@ -3398,61 +3362,6 @@ do_query(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (step->statement || step->cursor || step->paramval_data) - { - /* need to use Extended Query Protocol */ - int fetch = 0; - bool prepared = false; - - /* if prepared statement is referenced see if it is already exist */ - if (step->statement) - prepared = ActivateDatanodeStatementOnNode(step->statement, - connections[i]->nodenum); - /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter - */ - if (step->cursor) - fetch = 1; - - if (pgxc_node_send_query_extended(connections[i], - prepared ? NULL : step->sql_statement, - step->statement, - step->cursor, - step->paramval_len, - step->paramval_data, - step->read_only, - fetch) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } - else - { - if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } connections[i]->combiner = node; } @@ -3566,6 +3475,43 @@ do_query(RemoteQueryState *node) } } +/* + * ExecRemoteQueryInnerPlan + * Executes the inner plan of a RemoteQuery. It returns false if the inner plan + * does not return any row, otherwise it returns true. + */ +static bool +ExecRemoteQueryInnerPlan(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); + /* + * Use data row returned by the previus step as a parameters for + * the main query. + */ + if (!TupIsNull(innerSlot)) + { + step->paramval_len = ExecCopySlotDatarow(innerSlot, + &step->paramval_data); + + /* Needed for expression evaluation */ + if (estate->es_param_exec_vals) + { + int i; + int natts = innerSlot->tts_tupleDescriptor->natts; + + slot_getallattrs(innerSlot); + for (i = 0; i < natts; i++) + estate->es_param_exec_vals[i].value = slot_getattr( + innerSlot, + i+1, + &estate->es_param_exec_vals[i].isnull); + } + return true; + } + return false; +} /* * Execute step of PGXC plan. @@ -3581,7 +3527,6 @@ TupleTableSlot * ExecRemoteQuery(RemoteQueryState *node) { RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; - EState *estate = node->ss.ps.state; TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; bool have_tuple = false; @@ -3600,31 +3545,7 @@ ExecRemoteQuery(RemoteQueryState *node) */ if (innerPlanState(node)) { - TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); - /* - * Use data row returned by the previus step as a parameters for - * the main query. - */ - if (!TupIsNull(innerSlot)) - { - step->paramval_len = ExecCopySlotDatarow(innerSlot, - &step->paramval_data); - - /* Needed for expression evaluation */ - if (estate->es_param_exec_vals) - { - int i; - int natts = innerSlot->tts_tupleDescriptor->natts; - - slot_getallattrs(innerSlot); - for (i = 0; i < natts; i++) - estate->es_param_exec_vals[i].value = slot_getattr( - innerSlot, - i+1, - &estate->es_param_exec_vals[i].isnull); - } - } - else + if (!ExecRemoteQueryInnerPlan(node)) { /* no parameters, exit */ return NULL; @@ -3741,31 +3662,8 @@ handle_results: */ if (innerPlanState(node)) { - TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); - if (!TupIsNull(innerSlot)) + if (ExecRemoteQueryInnerPlan(node)) { - /* reset the counter */ - node->command_complete_count = 0; - /* - * Use data row returned by the previus step as a parameters for - * the main query. - */ - step->paramval_len = ExecCopySlotDatarow(innerSlot, - &step->paramval_data); - - /* Needed for expression evaluation */ - if (estate->es_param_exec_vals) - { - int i; - int natts = innerSlot->tts_tupleDescriptor->natts; - - slot_getallattrs(innerSlot); - for (i = 0; i < natts; i++) - estate->es_param_exec_vals[i].value = slot_getattr( - innerSlot, - i+1, - &estate->es_param_exec_vals[i].isnull); - } do_query(node); goto handle_results; } diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index 9ee983c8cf..0c584da1f0 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -84,9 +84,9 @@ typedef struct List *nodelist; char baselocatortype; TableUsageType tableusagetype; /* track pg_catalog usage */ - Expr *expr; /* expression to evaluate at execution time if planner + Expr *en_expr; /* expression to evaluate at execution time if planner * can not determine execution nodes */ - Oid relid; /* Relation to determine execution nodes */ + Oid en_relid; /* Relation to determine execution nodes */ RelationAccessType accesstype; /* Access type to determine execution nodes */ } ExecNodes; |
