diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 24 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolcomm.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 40 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 3 | ||||
| -rw-r--r-- | src/include/pgxc/poolmgr.h | 3 |
5 files changed, 56 insertions, 16 deletions
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 6882d59780..bb8e94c829 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -111,7 +111,8 @@ static bool DoInvalidateRemoteHandles(void); #endif #ifdef XCP -static void pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session); +static void pgxc_node_init(PGXCNodeHandle *handle, int sock, + bool global_session, int pid); #else static void pgxc_node_init(PGXCNodeHandle *handle, int sock); #endif @@ -395,11 +396,12 @@ pgxc_node_all_free(void) * Structure stores state info and I/O buffers */ static void -pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session) +pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session, int pid) { char *init_str; handle->sock = sock; + handle->backend_pid = pid; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; handle->read_only = true; @@ -1957,10 +1959,13 @@ get_any_handle(List *datanodelist) { /* The node is requested */ List *allocate = list_make1_int(node); - int *fds = PoolManagerGetConnections(allocate, NIL); + int *pids; + int *fds = PoolManagerGetConnections(allocate, NIL, + &pids); if (!fds) { + Assert(pids != NULL); ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("Failed to get pooled connections"), @@ -1973,7 +1978,7 @@ get_any_handle(List *datanodelist) "max_connections and max_pool_size configuration " "parameters"))); } - pgxc_node_init(&dn_handles[node], fds[0], true); + pgxc_node_init(&dn_handles[node], fds[0], true, pids[0]); datanode_count++; /* @@ -2173,7 +2178,8 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool if (dn_allocate || co_allocate) { int j = 0; - int *fds = PoolManagerGetConnections(dn_allocate, co_allocate); + int *pids; + int *fds = PoolManagerGetConnections(dn_allocate, co_allocate, &pids); if (!fds) { @@ -2207,7 +2213,8 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool foreach(node_list_item, dn_allocate) { int node = lfirst_int(node_list_item); - int fdsock = fds[j++]; + int fdsock = fds[j]; + int be_pid = pids[j++]; if (node < 0 || node >= NumDataNodes) { @@ -2217,7 +2224,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool } node_handle = &dn_handles[node]; - pgxc_node_init(node_handle, fdsock, is_global_session); + pgxc_node_init(node_handle, fdsock, is_global_session, be_pid); dn_handles[node] = *node_handle; datanode_count++; } @@ -2228,6 +2235,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool foreach(node_list_item, co_allocate) { int node = lfirst_int(node_list_item); + int be_pid = pids[j]; int fdsock = fds[j++]; if (node < 0 || node >= NumCoords) @@ -2238,7 +2246,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool } node_handle = &co_handles[node]; - pgxc_node_init(node_handle, fdsock, is_global_session); + pgxc_node_init(node_handle, fdsock, is_global_session, be_pid); co_handles[node] = *node_handle; coord_count++; } diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 49a13bbab0..29f88df55d 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -793,7 +793,7 @@ pool_recvpids(PoolPort *port, int **pids) { int n; memcpy(&n, buf + 5 + i * sizeof(int), sizeof(int)); - *pids[i] = ntohl(n); + (*pids)[i] = ntohl(n); } return n32; diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 3dac376f05..95514e31a6 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -132,7 +132,8 @@ static int destroy_database_pool(const char *database, const char *user_name); static void reload_database_pools(PoolAgent *agent); static DatabasePool *find_database_pool(const char *database, const char *user_name, const char *pgoptions); static DatabasePool *remove_database_pool(const char *database, const char *user_name); -static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist); +static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, + List *coordlist, int **connectionpids); static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist); static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, Oid node); static void agent_release_connections(PoolAgent *agent, bool force_destroy); @@ -905,7 +906,7 @@ PoolManagerDisconnect(void) * Get pooled connections */ int * -PoolManagerGetConnections(List *datanodelist, List *coordlist) +PoolManagerGetConnections(List *datanodelist, List *coordlist, int **pids) { int i; ListCell *nodelist_item; @@ -959,6 +960,13 @@ PoolManagerGetConnections(List *datanodelist, List *coordlist) return NULL; } + if (pool_recvpids(&poolHandle->port, pids) != totlen) + { + pfree(*pids); + *pids = NULL; + return NULL; + } + return fds; } @@ -1276,13 +1284,22 @@ agent_handle_input(PoolAgent * agent, StringInfo s) * In case of error agent_acquire_connections will log * the error and return NULL */ - fds = agent_acquire_connections(agent, datanodelist, coordlist); + fds = agent_acquire_connections(agent, datanodelist, coordlist, + &pids); list_free(datanodelist); list_free(coordlist); pool_sendfds(&agent->port, fds, fds ? datanodecount + coordcount : 0); if (fds) pfree(fds); + + /* + * Also send the PIDs of the remote backend processes serving + * these connections + */ + pool_sendpids(&agent->port, pids, pids ? datanodecount + coordcount : 0); + if (pids) + pfree(pids); break; case 'h': /* Cancel SQL Command in progress on specified connections */ @@ -1390,7 +1407,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s) * acquire connection */ static int * -agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) +agent_acquire_connections(PoolAgent *agent, List *datanodelist, + List *coordlist, int **pids) { int i; int *result; @@ -1420,6 +1438,14 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) errmsg("out of memory"))); } + *pids = (int *) palloc((list_length(datanodelist) + list_length(coordlist)) * sizeof(int)); + if (*pids == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + /* * There are possible memory allocations in the core pooler, we want * these allocations in the contect of the database pool @@ -1458,7 +1484,8 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) */ } - result[i++] = PQsocket((PGconn *) agent->dn_connections[node]->conn); + result[i] = PQsocket((PGconn *) agent->dn_connections[node]->conn); + (*pids)[i++] = ((PGconn *) agent->dn_connections[node]->conn)->be_pid; } /* Save then in the array fds for Coordinators */ @@ -1489,7 +1516,8 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) */ } - result[i++] = PQsocket((PGconn *) agent->coord_connections[node]->conn); + result[i] = PQsocket((PGconn *) agent->coord_connections[node]->conn); + (*pids)[i++] = ((PGconn *) agent->coord_connections[node]->conn)->be_pid; } MemoryContextSwitchTo(oldcontext); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index 789d7cecbc..d57aabe18b 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -66,6 +66,9 @@ struct pgxc_node_handle /* fd of the connection */ int sock; + /* pid of the remote backend process */ + int backend_pid; + /* Connection state */ char transaction_status; DNConnectionState state; diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index e9f5bbd189..2b8ad5dbef 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -117,7 +117,8 @@ extern char *session_options(void); extern void PoolManagerReconnect(void); /* Get pooled connections */ -extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist); +extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist, + int **pids); /* Clean pool connections */ extern void PoolManagerCleanConnection(List *datanodelist, List *coordlist, char *dbname, char *username); |
