summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c24
-rw-r--r--src/backend/pgxc/pool/poolcomm.c2
-rw-r--r--src/backend/pgxc/pool/poolmgr.c40
-rw-r--r--src/include/pgxc/pgxcnode.h3
-rw-r--r--src/include/pgxc/poolmgr.h3
5 files changed, 56 insertions, 16 deletions
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 6882d59780..bb8e94c829 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -111,7 +111,8 @@ static bool DoInvalidateRemoteHandles(void);
#endif
#ifdef XCP
-static void pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session);
+static void pgxc_node_init(PGXCNodeHandle *handle, int sock,
+ bool global_session, int pid);
#else
static void pgxc_node_init(PGXCNodeHandle *handle, int sock);
#endif
@@ -395,11 +396,12 @@ pgxc_node_all_free(void)
* Structure stores state info and I/O buffers
*/
static void
-pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session)
+pgxc_node_init(PGXCNodeHandle *handle, int sock, bool global_session, int pid)
{
char *init_str;
handle->sock = sock;
+ handle->backend_pid = pid;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
handle->read_only = true;
@@ -1957,10 +1959,13 @@ get_any_handle(List *datanodelist)
{
/* The node is requested */
List *allocate = list_make1_int(node);
- int *fds = PoolManagerGetConnections(allocate, NIL);
+ int *pids;
+ int *fds = PoolManagerGetConnections(allocate, NIL,
+ &pids);
if (!fds)
{
+ Assert(pids != NULL);
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("Failed to get pooled connections"),
@@ -1973,7 +1978,7 @@ get_any_handle(List *datanodelist)
"max_connections and max_pool_size configuration "
"parameters")));
}
- pgxc_node_init(&dn_handles[node], fds[0], true);
+ pgxc_node_init(&dn_handles[node], fds[0], true, pids[0]);
datanode_count++;
/*
@@ -2173,7 +2178,8 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool
if (dn_allocate || co_allocate)
{
int j = 0;
- int *fds = PoolManagerGetConnections(dn_allocate, co_allocate);
+ int *pids;
+ int *fds = PoolManagerGetConnections(dn_allocate, co_allocate, &pids);
if (!fds)
{
@@ -2207,7 +2213,8 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool
foreach(node_list_item, dn_allocate)
{
int node = lfirst_int(node_list_item);
- int fdsock = fds[j++];
+ int fdsock = fds[j];
+ int be_pid = pids[j++];
if (node < 0 || node >= NumDataNodes)
{
@@ -2217,7 +2224,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool
}
node_handle = &dn_handles[node];
- pgxc_node_init(node_handle, fdsock, is_global_session);
+ pgxc_node_init(node_handle, fdsock, is_global_session, be_pid);
dn_handles[node] = *node_handle;
datanode_count++;
}
@@ -2228,6 +2235,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool
foreach(node_list_item, co_allocate)
{
int node = lfirst_int(node_list_item);
+ int be_pid = pids[j];
int fdsock = fds[j++];
if (node < 0 || node >= NumCoords)
@@ -2238,7 +2246,7 @@ get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query, bool
}
node_handle = &co_handles[node];
- pgxc_node_init(node_handle, fdsock, is_global_session);
+ pgxc_node_init(node_handle, fdsock, is_global_session, be_pid);
co_handles[node] = *node_handle;
coord_count++;
}
diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c
index 49a13bbab0..29f88df55d 100644
--- a/src/backend/pgxc/pool/poolcomm.c
+++ b/src/backend/pgxc/pool/poolcomm.c
@@ -793,7 +793,7 @@ pool_recvpids(PoolPort *port, int **pids)
{
int n;
memcpy(&n, buf + 5 + i * sizeof(int), sizeof(int));
- *pids[i] = ntohl(n);
+ (*pids)[i] = ntohl(n);
}
return n32;
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 3dac376f05..95514e31a6 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -132,7 +132,8 @@ static int destroy_database_pool(const char *database, const char *user_name);
static void reload_database_pools(PoolAgent *agent);
static DatabasePool *find_database_pool(const char *database, const char *user_name, const char *pgoptions);
static DatabasePool *remove_database_pool(const char *database, const char *user_name);
-static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
+static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist,
+ List *coordlist, int **connectionpids);
static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, Oid node);
static void agent_release_connections(PoolAgent *agent, bool force_destroy);
@@ -905,7 +906,7 @@ PoolManagerDisconnect(void)
* Get pooled connections
*/
int *
-PoolManagerGetConnections(List *datanodelist, List *coordlist)
+PoolManagerGetConnections(List *datanodelist, List *coordlist, int **pids)
{
int i;
ListCell *nodelist_item;
@@ -959,6 +960,13 @@ PoolManagerGetConnections(List *datanodelist, List *coordlist)
return NULL;
}
+ if (pool_recvpids(&poolHandle->port, pids) != totlen)
+ {
+ pfree(*pids);
+ *pids = NULL;
+ return NULL;
+ }
+
return fds;
}
@@ -1276,13 +1284,22 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
* In case of error agent_acquire_connections will log
* the error and return NULL
*/
- fds = agent_acquire_connections(agent, datanodelist, coordlist);
+ fds = agent_acquire_connections(agent, datanodelist, coordlist,
+ &pids);
list_free(datanodelist);
list_free(coordlist);
pool_sendfds(&agent->port, fds, fds ? datanodecount + coordcount : 0);
if (fds)
pfree(fds);
+
+ /*
+ * Also send the PIDs of the remote backend processes serving
+ * these connections
+ */
+ pool_sendpids(&agent->port, pids, pids ? datanodecount + coordcount : 0);
+ if (pids)
+ pfree(pids);
break;
case 'h': /* Cancel SQL Command in progress on specified connections */
@@ -1390,7 +1407,8 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
* acquire connection
*/
static int *
-agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
+agent_acquire_connections(PoolAgent *agent, List *datanodelist,
+ List *coordlist, int **pids)
{
int i;
int *result;
@@ -1420,6 +1438,14 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
errmsg("out of memory")));
}
+ *pids = (int *) palloc((list_length(datanodelist) + list_length(coordlist)) * sizeof(int));
+ if (*pids == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
/*
* There are possible memory allocations in the core pooler, we want
* these allocations in the contect of the database pool
@@ -1458,7 +1484,8 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
*/
}
- result[i++] = PQsocket((PGconn *) agent->dn_connections[node]->conn);
+ result[i] = PQsocket((PGconn *) agent->dn_connections[node]->conn);
+ (*pids)[i++] = ((PGconn *) agent->dn_connections[node]->conn)->be_pid;
}
/* Save then in the array fds for Coordinators */
@@ -1489,7 +1516,8 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
*/
}
- result[i++] = PQsocket((PGconn *) agent->coord_connections[node]->conn);
+ result[i] = PQsocket((PGconn *) agent->coord_connections[node]->conn);
+ (*pids)[i++] = ((PGconn *) agent->coord_connections[node]->conn)->be_pid;
}
MemoryContextSwitchTo(oldcontext);
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index 789d7cecbc..d57aabe18b 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -66,6 +66,9 @@ struct pgxc_node_handle
/* fd of the connection */
int sock;
+ /* pid of the remote backend process */
+ int backend_pid;
+
/* Connection state */
char transaction_status;
DNConnectionState state;
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index e9f5bbd189..2b8ad5dbef 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -117,7 +117,8 @@ extern char *session_options(void);
extern void PoolManagerReconnect(void);
/* Get pooled connections */
-extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist);
+extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist,
+ int **pids);
/* Clean pool connections */
extern void PoolManagerCleanConnection(List *datanodelist, List *coordlist, char *dbname, char *username);