diff options
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 55 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 35 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 5 |
3 files changed, 86 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 1263b3b574..5ecdcd6611 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -166,9 +166,17 @@ static abort_callback_type dbcleanup_info = { NULL, NULL }; static int pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections, GlobalTransactionId gxid, bool need_tran_block, bool readOnly, char node_type); + +#ifdef XCP +static PGXCNodeAllHandles *get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, + RemoteQueryExecType exec_type, + bool is_global_session); +#else static PGXCNodeAllHandles * get_exec_connections(RemoteQueryState *planstate, ExecNodes *exec_nodes, RemoteQueryExecType exec_type); +#endif #ifndef XCP static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor); @@ -4008,7 +4016,7 @@ DataNodeCopyBegin(RemoteCopyData *rcstate) else { PGXCNodeAllHandles *pgxc_handles; - pgxc_handles = get_handles(nodelist, NULL, false); + pgxc_handles = get_handles(nodelist, NULL, false, true); connections = pgxc_handles->datanode_handles; Assert(pgxc_handles->dn_conn_count == conn_count); pfree(pgxc_handles); @@ -4803,15 +4811,21 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) } #endif - /* * Get Node connections depending on the connection type: * Datanodes Only, Coordinators only or both types */ static PGXCNodeAllHandles * +#ifdef XCP +get_exec_connections(RemoteQueryState *planstate, + ExecNodes *exec_nodes, + RemoteQueryExecType exec_type, + bool is_global_session) +#else get_exec_connections(RemoteQueryState *planstate, ExecNodes *exec_nodes, RemoteQueryExecType exec_type) +#endif { List *nodelist = NIL; List *primarynode = NIL; @@ -4938,7 +4952,11 @@ get_exec_connections(RemoteQueryState *planstate, } /* Get other connections (non-primary) */ +#ifdef XCP + pgxc_handles = get_handles(nodelist, coordlist, is_query_coord_only, is_global_session); +#else pgxc_handles = get_handles(nodelist, coordlist, is_query_coord_only); +#endif if (!pgxc_handles) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), @@ -4949,7 +4967,11 @@ get_exec_connections(RemoteQueryState *planstate, { /* Let's assume primary connection is always a Datanode connection for the moment */ PGXCNodeAllHandles *pgxc_conn_res; +#ifdef XCP + pgxc_conn_res = get_handles(primarynode, NULL, false, is_global_session); +#else pgxc_conn_res = get_handles(primarynode, NULL, false); +#endif /* primary connection is unique */ primaryconnection = pgxc_conn_res->datanode_handles[0]; @@ -5850,6 +5872,13 @@ ExecRemoteUtility(RemoteQuery *node) remotestate = makeNode(RemoteQueryState); combiner = (ResponseCombiner *)remotestate; InitResponseCombiner(combiner, 0, node->combine_type); + + /* + * Do not set global_session if it is a utility statement. + * Avoids CREATE NODE error on cluster configuration. + */ + pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type, + exec_direct_type != EXEC_DIRECT_UTILITY); #else /* * It is possible to invoke create table with inheritance on @@ -5859,9 +5888,9 @@ ExecRemoteUtility(RemoteQuery *node) ExecSetTempObjectIncluded(); remotestate = CreateResponseCombiner(0, node->combine_type); -#endif pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type); +#endif dn_conn_count = pgxc_connections->dn_conn_count; co_conn_count = pgxc_connections->co_conn_count; @@ -6178,7 +6207,11 @@ ExecCloseRemoteStatement(const char *stmt_name, List *nodelist) return; /* get needed Datanode connections */ +#ifdef XCP + all_handles = get_handles(nodelist, NIL, false, true); +#else all_handles = get_handles(nodelist, NIL, false); +#endif conn_count = all_handles->dn_conn_count; connections = all_handles->datanode_handles; @@ -7181,7 +7214,7 @@ pgxc_node_remote_finish(char *prepareGID, bool commit, if (nodelist == NIL && coordlist == NIL) return prepared_local; - pgxc_handles = get_handles(nodelist, coordlist, false); + pgxc_handles = get_handles(nodelist, coordlist, false, true); if (commit) sprintf(finish_cmd, "COMMIT PREPARED '%s'", prepareGID); @@ -7435,8 +7468,14 @@ ExecRemoteQuery(RemoteQueryState *node) * Get connections for Datanodes only, utilities and DDLs * are launched in ExecRemoteUtility */ +#ifdef XCP + pgxc_connections = get_exec_connections(node, step->exec_nodes, + step->exec_type, + true); +#else pgxc_connections = get_exec_connections(node, step->exec_nodes, step->exec_type); +#endif if (step->exec_type == EXEC_ON_DATANODES) { @@ -8455,7 +8494,11 @@ ExecFinishInitRemoteSubplan(RemoteSubplanState *node) if (node->execOnAll) { PGXCNodeAllHandles *pgxc_connections; +#ifdef XCP + pgxc_connections = get_handles(node->execNodes, NIL, false, true); +#else pgxc_connections = get_handles(node->execNodes, NIL, false); +#endif combiner->conn_count = pgxc_connections->dn_conn_count; combiner->connections = pgxc_connections->datanode_handles; combiner->current_conn = 0; @@ -9230,7 +9273,11 @@ get_success_nodes(int node_count, PGXCNodeHandle **handles, char node_type, Stri void pgxc_all_success_nodes(ExecNodes **d_nodes, ExecNodes **c_nodes, char **failednodes_msg) { +#ifdef XCP + PGXCNodeAllHandles *connections = get_exec_connections(NULL, NULL, EXEC_ON_ALL_NODES, true); +#else PGXCNodeAllHandles *connections = get_exec_connections(NULL, NULL, EXEC_ON_ALL_NODES); +#endif StringInfoData failednodes; initStringInfo(&failednodes); diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index d9c6a56742..96f062f36b 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -109,8 +109,11 @@ typedef struct static bool DoInvalidateRemoteHandles(void); #endif - +#ifdef XCP +static void pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session); +#else static void pgxc_node_init(PGXCNodeHandle *handle, int sock); +#endif static void pgxc_node_free(PGXCNodeHandle *handle); static void pgxc_node_all_free(void); @@ -434,7 +437,11 @@ pgxc_node_all_free(void) * Structure stores state info and I/O buffers */ static void +#ifdef XCP +pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session) +#else pgxc_node_init(PGXCNodeHandle *handle, int sock) +#endif { #ifdef XCP char *init_str; @@ -461,10 +468,13 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock) * We got a new connection, set on the remote node the session parameters * if defined. The transaction parameter should be sent after BEGIN */ - init_str = PGXCNodeGetSessionParamStr(); - if (init_str) + if (global_session) { - pgxc_node_set_query(handle, init_str); + init_str = PGXCNodeGetSessionParamStr(); + if (init_str) + { + pgxc_node_set_query(handle, init_str); + } } #endif } @@ -2103,8 +2113,11 @@ get_any_handle(List *datanodelist) (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("Failed to get pooled connections"))); } - +#ifdef XCP + pgxc_node_init(&dn_handles[node], fds[0], true); +#else pgxc_node_init(&dn_handles[node], fds[0]); +#endif datanode_count++; /* @@ -2135,7 +2148,11 @@ get_any_handle(List *datanodelist) * Coordinator fds is returned only if transaction uses a DDL */ PGXCNodeAllHandles * +#ifdef XCP +get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool is_global_session) +#else get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) +#endif { PGXCNodeAllHandles *result; ListCell *node_list_item; @@ -2344,7 +2361,11 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) } node_handle = &dn_handles[node]; +#ifdef XCP + pgxc_node_init(node_handle, fdsock, is_global_session); +#else pgxc_node_init(node_handle, fdsock); +#endif dn_handles[node] = *node_handle; datanode_count++; } @@ -2365,7 +2386,11 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query) } node_handle = &co_handles[node]; +#ifdef XCP + pgxc_node_init(node_handle, fdsock, is_global_session); +#else pgxc_node_init(node_handle, fdsock); +#endif co_handles[node] = *node_handle; coord_count++; } diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index 621e4a9a45..e686136bd5 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -165,7 +165,12 @@ extern int PGXCNodeGetNodeIdFromName(char *node_name, char node_type); #endif extern Oid PGXCNodeGetNodeOid(int nodeid, char node_type); +#ifdef XCP +extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only, bool is_global_session); +#else extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only); +#endif + #ifdef XCP extern PGXCNodeAllHandles *get_current_handles(void); #endif |
