summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/pgxc/pool/execRemote.c55
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c35
-rw-r--r--src/include/pgxc/pgxcnode.h5
3 files changed, 86 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 1263b3b574..5ecdcd6611 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -166,9 +166,17 @@ static abort_callback_type dbcleanup_info = { NULL, NULL };
static int pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
GlobalTransactionId gxid, bool need_tran_block,
bool readOnly, char node_type);
+
+#ifdef XCP
+static PGXCNodeAllHandles *get_exec_connections(RemoteQueryState *planstate,
+ ExecNodes *exec_nodes,
+ RemoteQueryExecType exec_type,
+ bool is_global_session);
+#else
static PGXCNodeAllHandles * get_exec_connections(RemoteQueryState *planstate,
ExecNodes *exec_nodes,
RemoteQueryExecType exec_type);
+#endif
#ifndef XCP
static void close_node_cursors(PGXCNodeHandle **connections, int conn_count, char *cursor);
@@ -4008,7 +4016,7 @@ DataNodeCopyBegin(RemoteCopyData *rcstate)
else
{
PGXCNodeAllHandles *pgxc_handles;
- pgxc_handles = get_handles(nodelist, NULL, false);
+ pgxc_handles = get_handles(nodelist, NULL, false, true);
connections = pgxc_handles->datanode_handles;
Assert(pgxc_handles->dn_conn_count == conn_count);
pfree(pgxc_handles);
@@ -4803,15 +4811,21 @@ ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
}
#endif
-
/*
* Get Node connections depending on the connection type:
* Datanodes Only, Coordinators only or both types
*/
static PGXCNodeAllHandles *
+#ifdef XCP
+get_exec_connections(RemoteQueryState *planstate,
+ ExecNodes *exec_nodes,
+ RemoteQueryExecType exec_type,
+ bool is_global_session)
+#else
get_exec_connections(RemoteQueryState *planstate,
ExecNodes *exec_nodes,
RemoteQueryExecType exec_type)
+#endif
{
List *nodelist = NIL;
List *primarynode = NIL;
@@ -4938,7 +4952,11 @@ get_exec_connections(RemoteQueryState *planstate,
}
/* Get other connections (non-primary) */
+#ifdef XCP
+ pgxc_handles = get_handles(nodelist, coordlist, is_query_coord_only, is_global_session);
+#else
pgxc_handles = get_handles(nodelist, coordlist, is_query_coord_only);
+#endif
if (!pgxc_handles)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -4949,7 +4967,11 @@ get_exec_connections(RemoteQueryState *planstate,
{
/* Let's assume primary connection is always a Datanode connection for the moment */
PGXCNodeAllHandles *pgxc_conn_res;
+#ifdef XCP
+ pgxc_conn_res = get_handles(primarynode, NULL, false, is_global_session);
+#else
pgxc_conn_res = get_handles(primarynode, NULL, false);
+#endif
/* primary connection is unique */
primaryconnection = pgxc_conn_res->datanode_handles[0];
@@ -5850,6 +5872,13 @@ ExecRemoteUtility(RemoteQuery *node)
remotestate = makeNode(RemoteQueryState);
combiner = (ResponseCombiner *)remotestate;
InitResponseCombiner(combiner, 0, node->combine_type);
+
+ /*
+ * Do not set global_session if it is a utility statement.
+ * Avoids CREATE NODE error on cluster configuration.
+ */
+ pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type,
+ exec_direct_type != EXEC_DIRECT_UTILITY);
#else
/*
* It is possible to invoke create table with inheritance on
@@ -5859,9 +5888,9 @@ ExecRemoteUtility(RemoteQuery *node)
ExecSetTempObjectIncluded();
remotestate = CreateResponseCombiner(0, node->combine_type);
-#endif
pgxc_connections = get_exec_connections(NULL, node->exec_nodes, exec_type);
+#endif
dn_conn_count = pgxc_connections->dn_conn_count;
co_conn_count = pgxc_connections->co_conn_count;
@@ -6178,7 +6207,11 @@ ExecCloseRemoteStatement(const char *stmt_name, List *nodelist)
return;
/* get needed Datanode connections */
+#ifdef XCP
+ all_handles = get_handles(nodelist, NIL, false, true);
+#else
all_handles = get_handles(nodelist, NIL, false);
+#endif
conn_count = all_handles->dn_conn_count;
connections = all_handles->datanode_handles;
@@ -7181,7 +7214,7 @@ pgxc_node_remote_finish(char *prepareGID, bool commit,
if (nodelist == NIL && coordlist == NIL)
return prepared_local;
- pgxc_handles = get_handles(nodelist, coordlist, false);
+ pgxc_handles = get_handles(nodelist, coordlist, false, true);
if (commit)
sprintf(finish_cmd, "COMMIT PREPARED '%s'", prepareGID);
@@ -7435,8 +7468,14 @@ ExecRemoteQuery(RemoteQueryState *node)
* Get connections for Datanodes only, utilities and DDLs
* are launched in ExecRemoteUtility
*/
+#ifdef XCP
+ pgxc_connections = get_exec_connections(node, step->exec_nodes,
+ step->exec_type,
+ true);
+#else
pgxc_connections = get_exec_connections(node, step->exec_nodes,
step->exec_type);
+#endif
if (step->exec_type == EXEC_ON_DATANODES)
{
@@ -8455,7 +8494,11 @@ ExecFinishInitRemoteSubplan(RemoteSubplanState *node)
if (node->execOnAll)
{
PGXCNodeAllHandles *pgxc_connections;
+#ifdef XCP
+ pgxc_connections = get_handles(node->execNodes, NIL, false, true);
+#else
pgxc_connections = get_handles(node->execNodes, NIL, false);
+#endif
combiner->conn_count = pgxc_connections->dn_conn_count;
combiner->connections = pgxc_connections->datanode_handles;
combiner->current_conn = 0;
@@ -9230,7 +9273,11 @@ get_success_nodes(int node_count, PGXCNodeHandle **handles, char node_type, Stri
void
pgxc_all_success_nodes(ExecNodes **d_nodes, ExecNodes **c_nodes, char **failednodes_msg)
{
+#ifdef XCP
+ PGXCNodeAllHandles *connections = get_exec_connections(NULL, NULL, EXEC_ON_ALL_NODES, true);
+#else
PGXCNodeAllHandles *connections = get_exec_connections(NULL, NULL, EXEC_ON_ALL_NODES);
+#endif
StringInfoData failednodes;
initStringInfo(&failednodes);
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index d9c6a56742..96f062f36b 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -109,8 +109,11 @@ typedef struct
static bool DoInvalidateRemoteHandles(void);
#endif
-
+#ifdef XCP
+static void pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session);
+#else
static void pgxc_node_init(PGXCNodeHandle *handle, int sock);
+#endif
static void pgxc_node_free(PGXCNodeHandle *handle);
static void pgxc_node_all_free(void);
@@ -434,7 +437,11 @@ pgxc_node_all_free(void)
* Structure stores state info and I/O buffers
*/
static void
+#ifdef XCP
+pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session)
+#else
pgxc_node_init(PGXCNodeHandle *handle, int sock)
+#endif
{
#ifdef XCP
char *init_str;
@@ -461,10 +468,13 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock)
* We got a new connection, set on the remote node the session parameters
* if defined. The transaction parameter should be sent after BEGIN
*/
- init_str = PGXCNodeGetSessionParamStr();
- if (init_str)
+ if (global_session)
{
- pgxc_node_set_query(handle, init_str);
+ init_str = PGXCNodeGetSessionParamStr();
+ if (init_str)
+ {
+ pgxc_node_set_query(handle, init_str);
+ }
}
#endif
}
@@ -2103,8 +2113,11 @@ get_any_handle(List *datanodelist)
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("Failed to get pooled connections")));
}
-
+#ifdef XCP
+ pgxc_node_init(&dn_handles[node], fds[0], true);
+#else
pgxc_node_init(&dn_handles[node], fds[0]);
+#endif
datanode_count++;
/*
@@ -2135,7 +2148,11 @@ get_any_handle(List *datanodelist)
* Coordinator fds is returned only if transaction uses a DDL
*/
PGXCNodeAllHandles *
+#ifdef XCP
+get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool is_global_session)
+#else
get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
+#endif
{
PGXCNodeAllHandles *result;
ListCell *node_list_item;
@@ -2344,7 +2361,11 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
}
node_handle = &dn_handles[node];
+#ifdef XCP
+ pgxc_node_init(node_handle, fdsock, is_global_session);
+#else
pgxc_node_init(node_handle, fdsock);
+#endif
dn_handles[node] = *node_handle;
datanode_count++;
}
@@ -2365,7 +2386,11 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
}
node_handle = &co_handles[node];
+#ifdef XCP
+ pgxc_node_init(node_handle, fdsock, is_global_session);
+#else
pgxc_node_init(node_handle, fdsock);
+#endif
co_handles[node] = *node_handle;
coord_count++;
}
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index 621e4a9a45..e686136bd5 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -165,7 +165,12 @@ extern int PGXCNodeGetNodeIdFromName(char *node_name, char node_type);
#endif
extern Oid PGXCNodeGetNodeOid(int nodeid, char node_type);
+#ifdef XCP
+extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only, bool is_global_session);
+#else
extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
+#endif
+
#ifdef XCP
extern PGXCNodeAllHandles *get_current_handles(void);
#endif