diff options
| author | Ashutosh Bapat | 2011-07-27 05:42:22 +0000 |
|---|---|---|
| committer | Ashutosh Bapat | 2011-07-27 05:42:22 +0000 |
| commit | 93d93ce4984b3868d80a6eb6b58e67abf99ce50f (patch) | |
| tree | 7ea9a8174bc74686893e3228e2db7fd98844b797 /src/backend | |
| parent | f0f8a4b0680532a89c364c2294d3ca3a07ef7fea (diff) | |
In function do_query() and ExecRemoteQuery() there is duplicated code
respectively. Gathered this code into respective functions and called these
functions instead of duplicating the code.
The members expr and relid of ExecNodes are renamed as en_expr and en_relid.
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/nodes/copyfuncs.c | 4 | ||||
| -rw-r--r-- | src/backend/optimizer/plan/createplan.c | 8 | ||||
| -rw-r--r-- | src/backend/parser/analyze.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 12 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 310 |
5 files changed, 117 insertions, 219 deletions
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 9f07766e1c..83fac1ce93 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1035,8 +1035,8 @@ _copyExecNodes(ExecNodes *from) COPY_NODE_FIELD(nodelist); COPY_SCALAR_FIELD(baselocatortype); COPY_SCALAR_FIELD(tableusagetype); - COPY_NODE_FIELD(expr); - COPY_SCALAR_FIELD(relid); + COPY_NODE_FIELD(en_expr); + COPY_SCALAR_FIELD(en_relid); COPY_SCALAR_FIELD(accesstype); return newnode; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 699a476f99..d3b5c7793b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -5560,12 +5560,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan) xstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; xstep->exec_nodes->primarynodelist = NULL; xstep->exec_nodes->nodelist = NULL; - xstep->exec_nodes->relid = ttab->relid; + xstep->exec_nodes->en_relid = ttab->relid; xstep->exec_nodes->accesstype = RELATION_ACCESS_READ; /* First and only target entry of topplan is ctid, reference it */ ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0); - xstep->exec_nodes->expr = (Expr *) ctid; + xstep->exec_nodes->en_expr = (Expr *) ctid; pfree(xbuf->data); pfree(xbuf); @@ -5590,12 +5590,12 @@ create_remotedelete_plan(PlannerInfo *root, Plan *topplan) fstep->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; fstep->exec_nodes->primarynodelist = NULL; fstep->exec_nodes->nodelist = NULL; - fstep->exec_nodes->relid = ttab->relid; + fstep->exec_nodes->en_relid = ttab->relid; fstep->exec_nodes->accesstype = RELATION_ACCESS_UPDATE; /* First and only target entry of topplan is ctid, reference it */ ctid = makeVar(INNER, 1, TIDOID, -1, InvalidOid, 0); - fstep->exec_nodes->expr = (Expr *) ctid; + fstep->exec_nodes->en_expr = (Expr *) ctid; } pfree(buf->data); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index be8d2613c8..1483187f55 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -2382,7 +2382,7 @@ transformExecDirectStmt(ParseState *pstate, ExecDirectStmt *stmt) step->exec_nodes = (ExecNodes *) palloc(sizeof(ExecNodes)); step->exec_nodes->primarynodelist = NIL; step->exec_nodes->nodelist = NIL; - step->exec_nodes->expr = NIL; + step->exec_nodes->en_expr = NIL; step->force_autocommit = false; step->combine_type = COMBINE_TYPE_SAME; step->read_only = true; diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 4753088a90..54307387e4 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -643,8 +643,8 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) step->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_USER; step->exec_nodes->primarynodelist = NULL; step->exec_nodes->nodelist = NULL; - step->exec_nodes->expr = eval_expr; - step->exec_nodes->relid = rel_loc_info->relid; + step->exec_nodes->en_expr = eval_expr; + step->exec_nodes->en_relid = rel_loc_info->relid; step->exec_nodes->accesstype = RELATION_ACCESS_INSERT; return; } @@ -1742,8 +1742,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) TABLE_USAGE_TYPE_USER; context->query_step->exec_nodes->primarynodelist = NULL; context->query_step->exec_nodes->nodelist = NULL; - context->query_step->exec_nodes->expr = expr_comp->expr; - context->query_step->exec_nodes->relid = expr_comp->relid; + context->query_step->exec_nodes->en_expr = expr_comp->expr; + context->query_step->exec_nodes->en_relid = expr_comp->relid; context->query_step->exec_nodes->accesstype = context->accessType; break; } @@ -1760,8 +1760,8 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) context->query_step->exec_nodes->primarynodelist = NULL; context->query_step->exec_nodes->nodelist = list_copy(rel_loc_info->nodeList); - context->query_step->exec_nodes->expr = NULL; - context->query_step->exec_nodes->relid = NULL; + context->query_step->exec_nodes->en_expr = NULL; + context->query_step->exec_nodes->en_relid = NULL; context->query_step->exec_nodes->accesstype = context->accessType; } } diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 2f77f5e014..6226cebbbd 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -86,6 +86,10 @@ static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles); static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(PGXCNode_HandleRequested status_requested); +static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection, + bool need_tran, GlobalTransactionId gxid, TimestampTz timestamp, + RemoteQuery *step, int total_conn_count, Snapshot snapshot); +static bool ExecRemoteQueryInnerPlan(RemoteQueryState *node); #define MAX_STATEMENTS_PER_TRAN 10 @@ -2713,7 +2717,6 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type) { int i; - int nLen = htonl(4); RemoteQueryState *combiner = NULL; bool need_tran; bool error = false; @@ -2854,9 +2857,9 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) &node->paramval_data); /* We need expression context to evaluate */ - if (node->exec_nodes && node->exec_nodes->expr) + if (node->exec_nodes && node->exec_nodes->en_expr) { - Expr *expr = node->exec_nodes->expr; + Expr *expr = node->exec_nodes->en_expr; if (IsA(expr, Var) && ((Var *) expr)->vartype == TIDOID) { @@ -2969,7 +2972,7 @@ get_exec_connections(RemoteQueryState *planstate, if (exec_nodes) { - if (exec_nodes->expr) + if (exec_nodes->en_expr) { /* * Special case (argh, another one): if expression data type is TID @@ -2978,9 +2981,9 @@ get_exec_connections(RemoteQueryState *planstate, * So try and determine originating node and execute command on * that node only */ - if (IsA(exec_nodes->expr, Var) && ((Var *) exec_nodes->expr)->vartype == TIDOID) + if (IsA(exec_nodes->en_expr, Var) && ((Var *) exec_nodes->en_expr)->vartype == TIDOID) { - Var *ctid = (Var *) exec_nodes->expr; + Var *ctid = (Var *) exec_nodes->en_expr; PlanState *source = (PlanState *) planstate; TupleTableSlot *slot; @@ -3015,7 +3018,7 @@ get_exec_connections(RemoteQueryState *planstate, { /* execution time determining of target data nodes */ bool isnull; - ExprState *estate = ExecInitExpr(exec_nodes->expr, + ExprState *estate = ExecInitExpr(exec_nodes->en_expr, (PlanState *) planstate); Datum partvalue = ExecEvalExpr(estate, planstate->ss.ps.ps_ExprContext, @@ -3023,7 +3026,7 @@ get_exec_connections(RemoteQueryState *planstate, NULL); if (!isnull) { - RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->relid); + RelationLocInfo *rel_loc_info = GetRelationLocInfo(exec_nodes->en_relid); /* PGXCTODO what is the type of partvalue here*/ ExecNodes *nodes = GetRelationNodes(rel_loc_info, partvalue, UNKNOWNOID, exec_nodes->accesstype); if (nodes) @@ -3143,6 +3146,56 @@ register_write_nodes(int conn_count, PGXCNodeHandle **connections) } } +static bool +pgxc_start_command_on_connection(PGXCNodeHandle *connection, bool need_tran, + GlobalTransactionId gxid, TimestampTz timestamp, + RemoteQuery *step, int total_conn_count, + Snapshot snapshot) +{ + if (connection->state == DN_CONNECTION_STATE_QUERY) + BufferConnection(connection); + + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && pgxc_node_send_gxid(connection, gxid)) + return false; + if (total_conn_count == 1 && pgxc_node_send_timestamp(connection, timestamp)) + return false; + if (snapshot && pgxc_node_send_snapshot(connection, snapshot)) + return false; + if (step->statement || step->cursor || step->paramval_data) + { + /* need to use Extended Query Protocol */ + int fetch = 0; + bool prepared = false; + + /* if prepared statement is referenced see if it is already exist */ + if (step->statement) + prepared = ActivateDatanodeStatementOnNode(step->statement, + connection->nodenum); + /* + * execute and fetch rows only if they will be consumed + * immediately by the sorter + */ + if (step->cursor) + fetch = 1; + + if (pgxc_node_send_query_extended(connection, + prepared ? NULL : step->sql_statement, + step->statement, + step->cursor, + step->paramval_len, + step->paramval_data, + step->read_only, + fetch) != 0) + return false; + } + else + { + if (pgxc_node_send_query(connection, step->sql_statement) != 0) + return false; + } + return true; +} static void do_query(RemoteQueryState *node) @@ -3186,9 +3239,7 @@ do_query(RemoteQueryState *node) * Primary connection is counted separately but is included in total_conn_count if used. */ if (primaryconnection) - { regular_conn_count--; - } pfree(pgxc_connections); @@ -3257,33 +3308,8 @@ do_query(RemoteQueryState *node) /* See if we have a primary node, execute on it first before the others */ if (primaryconnection) { - if (primaryconnection->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(primaryconnection); - - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid)) - { - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp)) - { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot)) + if (!pgxc_start_command_on_connection(primaryconnection, need_tran, gxid, + timestamp, step, total_conn_count, snapshot)) { pfree(connections); pfree(primaryconnection); @@ -3291,51 +3317,6 @@ do_query(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (step->statement || step->cursor || step->paramval_data) - { - /* need to use Extended Query Protocol */ - int fetch = 0; - bool prepared = false; - - /* if prepared statement is referenced see if it is already exist */ - if (step->statement) - prepared = ActivateDatanodeStatementOnNode(step->statement, - primaryconnection->nodenum); - /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter - */ - if (step->cursor) - fetch = 1; - - if (pgxc_node_send_query_extended(primaryconnection, - prepared ? NULL : step->sql_statement, - step->statement, - step->cursor, - step->paramval_len, - step->paramval_data, - step->read_only, - fetch) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } - else - { - if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0) - { - pfree(connections); - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } Assert(node->combine_type == COMBINE_TYPE_SAME); /* Make sure the command is completed on the primary node */ @@ -3371,26 +3352,9 @@ do_query(RemoteQueryState *node) for (i = 0; i < regular_conn_count; i++) { - if (connections[i]->state == DN_CONNECTION_STATE_QUERY) - BufferConnection(connections[i]); - /* If explicit transaction is needed gxid is already sent */ - if (!need_tran && pgxc_node_send_gxid(connections[i], gxid)) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp)) + if (!pgxc_start_command_on_connection(connections[i], need_tran, gxid, + timestamp, step, total_conn_count, snapshot)) { - /* - * If a transaction involves multiple connections timestamp is - * always sent down to Datanodes with pgxc_node_begin. - * An autocommit transaction needs the global timestamp also, - * so handle this case here. - */ pfree(connections); if (primaryconnection) pfree(primaryconnection); @@ -3398,61 +3362,6 @@ do_query(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } - if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot)) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - if (step->statement || step->cursor || step->paramval_data) - { - /* need to use Extended Query Protocol */ - int fetch = 0; - bool prepared = false; - - /* if prepared statement is referenced see if it is already exist */ - if (step->statement) - prepared = ActivateDatanodeStatementOnNode(step->statement, - connections[i]->nodenum); - /* - * execute and fetch rows only if they will be consumed - * immediately by the sorter - */ - if (step->cursor) - fetch = 1; - - if (pgxc_node_send_query_extended(connections[i], - prepared ? NULL : step->sql_statement, - step->statement, - step->cursor, - step->paramval_len, - step->paramval_data, - step->read_only, - fetch) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } - else - { - if (pgxc_node_send_query(connections[i], step->sql_statement) != 0) - { - pfree(connections); - if (primaryconnection) - pfree(primaryconnection); - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to send command to data nodes"))); - } - } connections[i]->combiner = node; } @@ -3566,6 +3475,43 @@ do_query(RemoteQueryState *node) } } +/* + * ExecRemoteQueryInnerPlan + * Executes the inner plan of a RemoteQuery. It returns false if the inner plan + * does not return any row, otherwise it returns true. + */ +static bool +ExecRemoteQueryInnerPlan(RemoteQueryState *node) +{ + RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); + /* + * Use data row returned by the previus step as a parameters for + * the main query. + */ + if (!TupIsNull(innerSlot)) + { + step->paramval_len = ExecCopySlotDatarow(innerSlot, + &step->paramval_data); + + /* Needed for expression evaluation */ + if (estate->es_param_exec_vals) + { + int i; + int natts = innerSlot->tts_tupleDescriptor->natts; + + slot_getallattrs(innerSlot); + for (i = 0; i < natts; i++) + estate->es_param_exec_vals[i].value = slot_getattr( + innerSlot, + i+1, + &estate->es_param_exec_vals[i].isnull); + } + return true; + } + return false; +} /* * Execute step of PGXC plan. @@ -3581,7 +3527,6 @@ TupleTableSlot * ExecRemoteQuery(RemoteQueryState *node) { RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan; - EState *estate = node->ss.ps.state; TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; bool have_tuple = false; @@ -3600,31 +3545,7 @@ ExecRemoteQuery(RemoteQueryState *node) */ if (innerPlanState(node)) { - TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); - /* - * Use data row returned by the previus step as a parameters for - * the main query. - */ - if (!TupIsNull(innerSlot)) - { - step->paramval_len = ExecCopySlotDatarow(innerSlot, - &step->paramval_data); - - /* Needed for expression evaluation */ - if (estate->es_param_exec_vals) - { - int i; - int natts = innerSlot->tts_tupleDescriptor->natts; - - slot_getallattrs(innerSlot); - for (i = 0; i < natts; i++) - estate->es_param_exec_vals[i].value = slot_getattr( - innerSlot, - i+1, - &estate->es_param_exec_vals[i].isnull); - } - } - else + if (!ExecRemoteQueryInnerPlan(node)) { /* no parameters, exit */ return NULL; @@ -3741,31 +3662,8 @@ handle_results: */ if (innerPlanState(node)) { - TupleTableSlot *innerSlot = ExecProcNode(innerPlanState(node)); - if (!TupIsNull(innerSlot)) + if (ExecRemoteQueryInnerPlan(node)) { - /* reset the counter */ - node->command_complete_count = 0; - /* - * Use data row returned by the previus step as a parameters for - * the main query. - */ - step->paramval_len = ExecCopySlotDatarow(innerSlot, - &step->paramval_data); - - /* Needed for expression evaluation */ - if (estate->es_param_exec_vals) - { - int i; - int natts = innerSlot->tts_tupleDescriptor->natts; - - slot_getallattrs(innerSlot); - for (i = 0; i < natts; i++) - estate->es_param_exec_vals[i].value = slot_getattr( - innerSlot, - i+1, - &estate->es_param_exec_vals[i].isnull); - } do_query(node); goto handle_results; } |
