summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/bootstrap/bootstrap.c6
-rw-r--r--src/backend/commands/prepare.c2
-rw-r--r--src/backend/nodes/list.c4
-rw-r--r--src/backend/pgxc/locator/locator.c2
-rw-r--r--src/backend/pgxc/nodemgr/nodemgr.c136
-rw-r--r--src/backend/pgxc/plan/planner.c2
-rw-r--r--src/backend/pgxc/pool/execRemote.c1
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c208
-rw-r--r--src/backend/pgxc/pool/poolmgr.c260
-rw-r--r--src/backend/pgxc/pool/poolutils.c1
-rw-r--r--src/backend/rewrite/rewriteHandler.c2
-rw-r--r--src/backend/storage/ipc/procarray.c8
-rw-r--r--src/backend/storage/lmgr/proc.c9
-rw-r--r--src/backend/utils/init/miscinit.c10
-rw-r--r--src/backend/utils/init/postinit.c8
-rw-r--r--src/include/miscadmin.h1
-rw-r--r--src/include/pgxc/nodemgr.h8
-rw-r--r--src/include/pgxc/poolmgr.h15
-rw-r--r--src/include/storage/proc.h5
19 files changed, 317 insertions, 371 deletions
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 40d03acbce..838ec0eebd 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -377,6 +377,12 @@ AuxiliaryProcessMain(int argc, char *argv[])
*/
if (IsUnderPostmaster)
{
+#ifdef PGXC
+ /* Initialize pooler flag before creating PGPROC structure */
+ if (auxType == PoolerProcess)
+ PGXCPoolerProcessIam();
+#endif
+
/*
* Create a PGPROC so we can use LWLocks. In the EXEC_BACKEND case,
* this was already done by SubPostmasterMain().
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index b6ae576df8..00cfece959 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -36,7 +36,7 @@
#ifdef PGXC
#include "pgxc/pgxc.h"
#include "nodes/nodes.h"
-#include "pgxc/poolmgr.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/execRemote.h"
#include "catalog/pgxc_node.h"
#endif
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index 46f4658aed..6c4d05e36e 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -838,8 +838,8 @@ list_intersection_int(List *list1, List *list2)
result = NIL;
foreach(cell, list1)
{
- if (list_member_int(list2, lfirst(cell)))
- result = lappend_int(result, lfirst(cell));
+ if (list_member_int(list2, lfirst_int(cell)))
+ result = lappend_int(result, lfirst_int(cell));
}
check_list_invariants(result);
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index e7b0e65c2f..5e7ec911fd 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -37,7 +37,7 @@
#include "utils/tqual.h"
#include "utils/syscache.h"
#include "nodes/nodes.h"
-#include "pgxc/poolmgr.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
#include "pgxc/pgxcnode.h"
diff --git a/src/backend/pgxc/nodemgr/nodemgr.c b/src/backend/pgxc/nodemgr/nodemgr.c
index 3f26c8d421..12e40a63fb 100644
--- a/src/backend/pgxc/nodemgr/nodemgr.c
+++ b/src/backend/pgxc/nodemgr/nodemgr.c
@@ -22,8 +22,15 @@
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
+#include "utils/tqual.h"
+#include "pgxc/locator.h"
#include "pgxc/nodemgr.h"
+/* Global number of nodes */
+int NumDataNodes = 2;
+int NumCoords = 1;
+int NumCoordSlaves = 0;
+int NumDataNodeSlaves = 0;
/*
* Check list of options and return things filled
@@ -96,6 +103,123 @@ check_options(List *options, DefElem **dhost,
}
}
+/* --------------------------------
+ * cmp_nodes
+ *
+ * Compare the Oids of two XC nodes
+ * to sort them in ascending order by their names
+ * --------------------------------
+ */
+static int
+cmp_nodes(const void *p1, const void *p2)
+{
+ Oid n1 = *((Oid *)p1);
+ Oid n2 = *((Oid *)p2);
+
+ if (strcmp(get_pgxc_nodename(n1), get_pgxc_nodename(n2)) < 0)
+ return -1;
+
+ if (strcmp(get_pgxc_nodename(n1), get_pgxc_nodename(n2)) == 0)
+ return 0;
+
+ return 1;
+}
+
+/*
+ * PgxcNodeCount
+ *
+ * Count number of PGXC nodes based on catalog information and return
+ * an ordered list of node Oids for each PGXC node type.
+ */
+void
+PgxcNodeListAndCount(Oid **coOids, Oid **dnOids, Oid **coslaveOids, Oid **dnslaveOids)
+{
+ Relation rel;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+
+ /* Reinitialize counts */
+ NumCoords = 0;
+ NumDataNodes = 0;
+ NumCoordSlaves = 0;
+ NumDataNodeSlaves = 0;
+
+ /*
+ * Node information initialization is made in one scan:
+ * 1) Scan pgxc_node catalog to find the number of nodes for
+ * each node type and make proper allocations
+ * 2) Then extract the node Oid
+ * 3) Complete primary/preferred node information
+ */
+ rel = heap_open(PgxcNodeRelationId, AccessShareLock);
+ scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ int numnodes;
+ Oid **nodes;
+ Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple);
+
+ /* Take data for given node type */
+ switch (nodeForm->node_type)
+ {
+ case PGXC_NODE_COORD_MASTER:
+ NumCoords++;
+ numnodes = NumCoords;
+ nodes = coOids;
+ break;
+ case PGXC_NODE_DATANODE_MASTER:
+ NumDataNodes++;
+ numnodes = NumDataNodes;
+ nodes = dnOids;
+ break;
+ case PGXC_NODE_COORD_SLAVE:
+ NumCoordSlaves++;
+ numnodes = NumCoordSlaves;
+ nodes = coslaveOids;
+ break;
+ case PGXC_NODE_DATANODE_SLAVE:
+ NumDataNodeSlaves++;
+ numnodes = NumDataNodeSlaves;
+ nodes = dnslaveOids;
+ break;
+ default:
+ break;
+ }
+
+ if (numnodes == 1)
+ *nodes = (Oid *) palloc(numnodes * sizeof(Oid));
+ else
+ *nodes = (Oid *) repalloc(*nodes, numnodes * sizeof(Oid));
+
+ (*nodes)[numnodes - 1] = get_pgxc_nodeoid(NameStr(nodeForm->node_name));
+
+ /*
+ * Save data related to preferred and primary node
+ * Preferred and primaries use node Oids
+ */
+ if (nodeForm->nodeis_primary)
+ primary_data_node = get_pgxc_nodeoid(NameStr(nodeForm->node_name));
+ if (nodeForm->nodeis_preferred)
+ {
+ preferred_data_node[num_preferred_data_nodes] =
+ get_pgxc_nodeoid(NameStr(nodeForm->node_name));
+ num_preferred_data_nodes++;
+ }
+ }
+ heap_endscan(scan);
+ heap_close(rel, AccessShareLock);
+
+ /* Finally sort the lists to be sent back */
+ if (NumCoords != 0)
+ qsort(*coOids, NumCoords, sizeof(Oid), cmp_nodes);
+ if (NumDataNodes != 0)
+ qsort(*dnOids, NumDataNodes, sizeof(Oid), cmp_nodes);
+ if (NumDataNodeSlaves != 0)
+ qsort(*coslaveOids, NumCoordSlaves, sizeof(Oid), cmp_nodes);
+ if (NumDataNodeSlaves != 0)
+ qsort(*dnslaveOids, NumDataNodeSlaves, sizeof(Oid), cmp_nodes);
+}
+
/*
* PgxcNodeCreate
*
@@ -478,13 +602,13 @@ PgxcNodeAlter(AlterNodeStmt *stmt)
if (node_port > 0)
{
new_record[Anum_pgxc_node_port - 1] = Int32GetDatum(node_port);
- new_record_repl[Anum_pgxc_node_port - 1] = true;
+ new_record_repl[Anum_pgxc_node_port - 1] = true;
}
if (node_host)
{
new_record[Anum_pgxc_node_host - 1] =
DirectFunctionCall1(namein, CStringGetDatum(node_host));
- new_record_repl[Anum_pgxc_node_host - 1] = true;
+ new_record_repl[Anum_pgxc_node_host - 1] = true;
}
if (drelated ||
node_type == PGXC_NODE_COORD_MASTER ||
@@ -492,22 +616,22 @@ PgxcNodeAlter(AlterNodeStmt *stmt)
{
/* Force update of related node to InvalidOid if node is changed to master */
new_record[Anum_pgxc_node_related - 1] = ObjectIdGetDatum(relatedOid);
- new_record_repl[Anum_pgxc_node_related - 1] = true;
+ new_record_repl[Anum_pgxc_node_related - 1] = true;
}
if (node_type != PGXC_NODE_NONE)
{
new_record[Anum_pgxc_node_type - 1] = CharGetDatum(node_type);
- new_record_repl[Anum_pgxc_node_type - 1] = true;
+ new_record_repl[Anum_pgxc_node_type - 1] = true;
}
if (is_primary)
{
new_record[Anum_pgxc_node_is_primary - 1] = BoolGetDatum(nodeis_primary);
- new_record_repl[Anum_pgxc_node_is_primary - 1] = true;
+ new_record_repl[Anum_pgxc_node_is_primary - 1] = true;
}
if (is_preferred)
{
new_record[Anum_pgxc_node_is_preferred - 1] = BoolGetDatum(nodeis_preferred);
- new_record_repl[Anum_pgxc_node_is_preferred - 1] = true;
+ new_record_repl[Anum_pgxc_node_is_preferred - 1] = true;
}
/* Update relation */
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 8a979db369..3e3b6113b7 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -35,9 +35,9 @@
#include "pgxc/execRemote.h"
#include "pgxc/pgxc.h"
#include "pgxc/locator.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/planner.h"
#include "pgxc/postgresql_fdw.h"
-#include "pgxc/poolmgr.h"
#include "tcop/pquery.h"
#include "utils/acl.h"
#include "utils/builtins.h"
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index a6a6a0b61f..b8a34b2110 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -30,6 +30,7 @@
#include "pgxc/execRemote.h"
#include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/poolmgr.h"
#include "storage/ipc.h"
#include "utils/datum.h"
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index 67142bbd20..b90c0ba98a 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -36,14 +36,13 @@
#include "catalog/pgxc_node.h"
#include "catalog/pg_collation.h"
#include "pgxc/locator.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/pgxc.h"
#include "pgxc/poolmgr.h"
#include "tcop/dest.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/memutils.h"
-#include "utils/snapmgr.h"
-#include "utils/tqual.h"
#include "utils/fmgroids.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
@@ -117,14 +116,8 @@ init_pgxc_handle(PGXCNodeHandle *pgxc_handle)
void
InitMultinodeExecutor(void)
{
- Relation rel;
- HeapScanDesc scan;
- HeapTuple tuple;
int count;
- int loc_co = 0;
- int loc_dn = 0;
- int loc_co_slave = 0;
- int loc_dn_slave = 0;
+ Oid *coOids, *dnOids, *coslaveOids, *dnslaveOids;
/* This function could get called multiple times because of sigjmp */
if (dn_handles != NULL &&
@@ -133,46 +126,8 @@ InitMultinodeExecutor(void)
co_slave_handles != NULL)
return;
- /* Reinitialize counts */
- NumCoords = 0;
- NumDataNodes = 0;
- NumCoordSlaves = 0;
- NumDataNodeSlaves = 0;
-
- /*
- * Node information initialization is made in two phases:
- * 1) Scan pgxc_node catalog to find the number of nodes for
- * each node type and make proper allocations
- * 2) Classify node information by alphabetical order
- * and save node Oid information properly.
- */
- rel = heap_open(PgxcNodeRelationId, AccessShareLock);
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple);
-
- /* Take data for given node type */
- switch (nodeForm->node_type)
- {
- case PGXC_NODE_COORD_MASTER:
- NumCoords++;
- break;
- case PGXC_NODE_DATANODE_MASTER:
- NumDataNodes++;
- break;
- case PGXC_NODE_COORD_SLAVE:
- NumCoordSlaves++;
- break;
- case PGXC_NODE_DATANODE_SLAVE:
- NumDataNodeSlaves++;
- break;
- default:
- continue;
- }
- }
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
+ /* Get classified list of node Oids */
+ PgxcNodeListAndCount(&coOids, &dnOids, &coslaveOids, &dnslaveOids);
/* Do proper initialization of handles */
if (NumDataNodes > 0)
@@ -198,156 +153,25 @@ InitMultinodeExecutor(void)
/* Initialize new empty slots */
for (count = 0; count < NumDataNodes; count++)
+ {
init_pgxc_handle(&dn_handles[count]);
+ dn_handles[count].nodeoid = dnOids[count];
+ }
for (count = 0; count < NumCoords; count++)
+ {
init_pgxc_handle(&co_handles[count]);
- for (count = 0; count < NumDataNodeSlaves; count++)
- init_pgxc_handle(&dn_slave_handles[count]);
+ co_handles[count].nodeoid = coOids[count];
+ }
for (count = 0; count < NumCoordSlaves; count++)
+ {
init_pgxc_handle(&co_slave_handles[count]);
-
- /* Now begin second phase and fill in slots with classified node information */
- rel = heap_open(PgxcNodeRelationId, AccessShareLock);
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ co_slave_handles[count].nodeoid = coslaveOids[count];
+ }
+ for (count = 0; count < NumDataNodeSlaves; count++)
{
- Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple);
- PGXCNodeHandle *curr_nodes;
- int curr_nodenum, i;
- int position = 1;
-
- /* Take data for given node type */
- switch (nodeForm->node_type)
- {
- case PGXC_NODE_COORD_MASTER:
- curr_nodes = co_handles;
- curr_nodenum = loc_co;
- break;
- case PGXC_NODE_DATANODE_MASTER:
- curr_nodes = dn_handles;
- curr_nodenum = loc_dn;
- break;
- case PGXC_NODE_COORD_SLAVE:
- curr_nodes = co_slave_handles;
- curr_nodenum = loc_co_slave;
- break;
- case PGXC_NODE_DATANODE_SLAVE:
- curr_nodes = dn_slave_handles;
- curr_nodenum = loc_dn_slave;
- break;
- default:
- continue;
- }
-
- /*
- * Classify by alphabetical order current array.
- * Find at which position current node should be placed.
- */
- if (curr_nodenum == 1)
- {
- /* Special case when only one node is present */
- int res = strcmp(NameStr(nodeForm->node_name),
- get_pgxc_nodename(curr_nodes[0].nodeoid));
- if (res < 0)
- position = 0;
- else
- position = 1;
- }
- else if (curr_nodenum > 1)
- {
- /* Case with more than 2 nodes in current array */
- for (i = 0; i < curr_nodenum - 1; i++)
- {
- /* New slot is first? */
- if (i == 0 &&
- strcmp(NameStr(nodeForm->node_name),
- get_pgxc_nodename(curr_nodes[i].nodeoid)) < 0)
- position = 0;
-
- /* Intermediate case */
- if (strcmp(NameStr(nodeForm->node_name),
- get_pgxc_nodename(curr_nodes[i].nodeoid)) > 0 &&
- strcmp(NameStr(nodeForm->node_name),
- get_pgxc_nodename(curr_nodes[i + 1].nodeoid)) < 0)
- {
- position = i + 1;
- break;
- }
-
- /* New slot is last? */
- if (i == curr_nodenum - 2 &&
- strcmp(NameStr(nodeForm->node_name),
- get_pgxc_nodename(curr_nodes[i + 1].nodeoid)) > 0)
- position = i + 2;
- }
- }
- /* Increment node count */
- curr_nodenum++;
-
- /* Rebuild current array */
- if (curr_nodenum == 1)
- {
- /* All slots are empty, fill in first one */
- curr_nodes[0].nodeoid = get_pgxc_nodeoid(NameStr(nodeForm->node_name));
- }
- else
- {
- /*
- * Move slots at the end of array to the right to let place
- * for the new slot entry.
- * Nothing should be done if position is the last one.
- */
- if (position != curr_nodenum - 1)
- {
- for (i = curr_nodenum - 2; i > position - 1; i--)
- {
- /* Move intermediate slot data */
- curr_nodes[i + 1].nodeoid = curr_nodes[i].nodeoid;
- }
- }
- /* Fill in new slot */
- curr_nodes[position].nodeoid =
- get_pgxc_nodeoid(NameStr(nodeForm->node_name));
- }
-
- /*
- * Save data related to preferred and primary node
- * Preferred and primaries use node Oids
- */
- if (nodeForm->nodeis_primary)
- primary_data_node = get_pgxc_nodeoid(NameStr(nodeForm->node_name));
- if (nodeForm->nodeis_preferred)
- {
- preferred_data_node[num_preferred_data_nodes] =
- get_pgxc_nodeoid(NameStr(nodeForm->node_name));
- num_preferred_data_nodes++;
- }
-
- /* Save new data */
- switch (nodeForm->node_type)
- {
- case PGXC_NODE_COORD_MASTER:
- co_handles = curr_nodes;
- loc_co = curr_nodenum;
- break;
- case PGXC_NODE_DATANODE_MASTER:
- dn_handles = curr_nodes;
- loc_dn = curr_nodenum;
- break;
- case PGXC_NODE_COORD_SLAVE:
- co_slave_handles = curr_nodes;
- loc_co_slave = curr_nodenum;
- break;
- case PGXC_NODE_DATANODE_SLAVE:
- dn_slave_handles = curr_nodes;
- loc_dn_slave = curr_nodenum;
- break;
- default:
- continue;
- }
+ init_pgxc_handle(&dn_slave_handles[count]);
+ dn_slave_handles[count].nodeoid = dnslaveOids[count];
}
- heap_endscan(scan);
- heap_close(rel, AccessShareLock);
datanode_count = 0;
coord_count = 0;
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index bc28389d48..bd29ece2fc 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -44,10 +44,12 @@
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
+#include "utils/resowner.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/poolutils.h"
#include "../interfaces/libpq/libpq-fe.h"
#include "../interfaces/libpq/libpq-int.h"
@@ -58,22 +60,31 @@
#include <sys/socket.h>
/* Configuration options */
-int NumDataNodes = 2;
-int NumCoords = 1;
-int NumCoordSlaves = 0;
-int NumDataNodeSlaves = 0;
int MinPoolSize = 1;
int MaxPoolSize = 100;
int PoolerPort = 6667;
bool PersistentConnections = false;
+/* Flag to tell if we are Postgres-XC pooler process */
+static bool am_pgxc_pooler = false;
+
+/* Connection information cached */
+typedef struct
+{
+ Oid nodeoid;
+ char *host;
+ int port;
+} PGXCNodeConnectionInfo;
+
/* The memory context */
static MemoryContext PoolerMemoryContext = NULL;
/* PGXC Nodes info list */
static PGXCNodeConnectionInfo *datanode_connInfos;
static PGXCNodeConnectionInfo *coord_connInfos;
+static PGXCNodeConnectionInfo *datanode_slave_connInfos;
+static PGXCNodeConnectionInfo *coord_slave_connInfos;
/* Pool to all the databases (linked list) */
static DatabasePool *databasePools = NULL;
@@ -87,7 +98,7 @@ static PoolHandle *poolHandle = NULL;
static int is_pool_cleaning = false;
static int server_fd = -1;
-static void node_info_init(StringInfo s);
+static void node_info_load(void);
static void agent_init(PoolAgent *agent, const char *database, const char *user_name);
static void agent_destroy(PoolAgent *agent);
static void agent_create(void);
@@ -137,6 +148,17 @@ static void pooler_quickdie(SIGNAL_ARGS);
*/
static volatile sig_atomic_t shutdown_requested = false;
+void
+PGXCPoolerProcessIam(void)
+{
+ am_pgxc_pooler = true;
+}
+
+bool
+IsPGXCPoolerProcess(void)
+{
+ return am_pgxc_pooler;
+}
/*
* Initialize internal structures
@@ -144,8 +166,6 @@ static volatile sig_atomic_t shutdown_requested = false;
int
PoolManagerInit()
{
- MemoryContext old_context;
-
elog(DEBUG1, "Pooler process is started: %d", getpid());
/*
@@ -189,7 +209,7 @@ PoolManagerInit()
PG_SETMASK(&UnBlockSig);
/* Allocate pooler structures in the Pooler context */
- old_context = MemoryContextSwitchTo(PoolerMemoryContext);
+ MemoryContextSwitchTo(PoolerMemoryContext);
poolAgents = (PoolAgent **) palloc(MaxConnections * sizeof(PoolAgent *));
if (poolAgents == NULL)
@@ -199,10 +219,84 @@ PoolManagerInit()
errmsg("out of memory")));
}
+ /* Initialize process ressources */
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "ForPoolerInfo");
+
+ /* Initialize pooler in Postgres-way */
+ InitPostgres(NULL, InvalidOid, NULL, NULL);
+
+ /* Initialize pooler connection info */
+ node_info_load();
+
PoolerLoop();
return 0;
}
+/*
+ * Load node info cached by scanning PGXC node catalog
+ */
+static void
+node_info_load(void)
+{
+ int count;
+ Oid *coOids = NULL;
+ Oid *dnOids = NULL;
+ Oid *coslaveOids = NULL;
+ Oid *dnslaveOids = NULL;
+
+ /* Update number of PGXC nodes saved in cache */
+ PgxcNodeListAndCount(&coOids, &dnOids, &coslaveOids, &dnslaveOids);
+
+ /* Then initialize the node informations */
+ if (NumDataNodes != 0)
+ datanode_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumDataNodes * sizeof(PGXCNodeConnectionInfo));
+ if (NumCoords != 0)
+ coord_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumCoords * sizeof(PGXCNodeConnectionInfo));
+ if (NumCoordSlaves != 0)
+ coord_slave_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumCoordSlaves * sizeof(PGXCNodeConnectionInfo));
+ if (NumDataNodeSlaves != 0)
+ coord_slave_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumDataNodeSlaves * sizeof(PGXCNodeConnectionInfo));
+
+ /* Fill in connection info structures */
+ for (count = 0; count < NumCoords; count++)
+ {
+ coord_connInfos[count].nodeoid = coOids[count];
+ coord_connInfos[count].port = get_pgxc_nodeport(coOids[count]);
+ coord_connInfos[count].host = get_pgxc_nodehost(coOids[count]);
+ }
+ for (count = 0; count < NumDataNodes; count++)
+ {
+ datanode_connInfos[count].nodeoid = dnOids[count];
+ datanode_connInfos[count].port = get_pgxc_nodeport(dnOids[count]);
+ datanode_connInfos[count].host = get_pgxc_nodehost(dnOids[count]);
+ }
+ for (count = 0; count < NumCoordSlaves; count++)
+ {
+ coord_slave_connInfos[count].nodeoid = coslaveOids[count];
+ coord_slave_connInfos[count].port = get_pgxc_nodeport(coslaveOids[count]);
+ coord_slave_connInfos[count].host = get_pgxc_nodehost(coslaveOids[count]);
+ }
+ for (count = 0; count < NumDataNodeSlaves; count++)
+ {
+ datanode_slave_connInfos[count].nodeoid = dnOids[count];
+ datanode_slave_connInfos[count].port = get_pgxc_nodeport(dnslaveOids[count]);
+ datanode_slave_connInfos[count].host = get_pgxc_nodehost(dnslaveOids[count]);
+ }
+
+ /* Clean up resources */
+ if (coOids)
+ pfree(coOids);
+ if (dnOids)
+ pfree(dnOids);
+ if (coslaveOids)
+ pfree(coslaveOids);
+ if (dnslaveOids)
+ pfree(dnslaveOids);
+}
/*
* Destroy internal structures
@@ -340,9 +434,8 @@ agent_create(void)
void
PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_name)
{
- int n32, i, j;
+ int n32;
char msgtype = 'c';
- int msg_len;
Assert(handle);
Assert(database);
@@ -355,43 +448,7 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na
pool_putbytes(&handle->port, &msgtype, 1);
/* Message length */
- msg_len = 4 + /* length itself */
- 4 + /* PID number */
- 4 + /* length of database name */
- strlen(database) + 1 +
- 4 + /* length of user name */
- strlen(user_name) + 1 +
- 4 + /* number of data nodes */
- 4 + /* number of coordinators */
- (NumDataNodes * 4) + /* port for each data node */
- (NumCoords * 4) + /* port for each coordinator */
- (NumDataNodes * 4) + /* host name length for each data node */
- (NumCoords * 4); /* host name length for each coordinator */
-
- /* Length of host names needs to be added to message length */
- for (j = 0; j < 2; j++)
- {
- int nodenum;
- char nodetype;
- if (j == 0)
- {
- nodenum = NumCoords;
- nodetype = PGXC_NODE_COORD_MASTER;
- }
- else
- {
- nodenum = NumDataNodes;
- nodetype = PGXC_NODE_DATANODE_MASTER;
- }
-
- for (i = 0; i < nodenum; i++)
- {
- Oid nodeoid = PGXCNodeGetNodeOid(i + 1, nodetype);
- msg_len += strlen(get_pgxc_nodehost(nodeoid)) + 1;
- }
- }
-
- n32 = htonl(msg_len);
+ n32 = htonl(strlen(database) + strlen(user_name) + 18);
pool_putbytes(&handle->port, (char *) &n32, 4);
/* PID number */
@@ -413,51 +470,6 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na
/* Send user name followed by \0 terminator */
pool_putbytes(&handle->port, user_name, strlen(user_name) + 1);
pool_flush(&handle->port);
-
- /* Send number of data nodes */
- n32 = htonl(NumDataNodes);
- pool_putbytes(&handle->port, (char *) &n32, 4);
-
- /* Send number of coordinators */
- n32 = htonl(NumCoords);
- pool_putbytes(&handle->port, (char *) &n32, 4);
-
- for (j = 0; j < 2; j++)
- {
- int nodenum;
- char nodetype;
- if (j == 0)
- {
- nodenum = NumCoords;
- nodetype = PGXC_NODE_COORD_MASTER;
- }
- else
- {
- nodenum = NumDataNodes;
- nodetype = PGXC_NODE_DATANODE_MASTER;
- }
-
- /* Send ports and hosts */
- for (i = 0; i < nodenum; i++)
- {
- Oid nodeoid = PGXCNodeGetNodeOid(i + 1, nodetype);
- int port_num = get_pgxc_nodeport(nodeoid);
- char *nodehost = get_pgxc_nodehost(nodeoid);
-
- /* send port */
- port_num = htonl(port_num);
- pool_putbytes(&handle->port, (char *) &port_num, 4);
-
- /* Length of host info */
- n32 = htonl(strlen(nodehost) + 1);
- pool_putbytes(&handle->port, (char *) &n32, 4);
-
- /* Send host info followed by \0 terminator */
- pool_putbytes(&handle->port, nodehost, strlen(nodehost) + 1);
- pool_flush(&handle->port);
- }
- }
- pool_flush(&handle->port);
}
int
@@ -507,69 +519,6 @@ PoolManagerSetCommand(PoolCommandType command_type, const char *set_command)
return res;
}
-/*
- * Use incoming message to set up node information cached in pooler
- */
-static void
-node_info_init(StringInfo s)
-{
- int i, j, len;
-
- if (coord_connInfos == NULL)
- {
- NumDataNodes = pq_getmsgint(s, 4);
- NumCoords = pq_getmsgint(s, 4);
-
- datanode_connInfos = (PGXCNodeConnectionInfo *)
- palloc(NumDataNodes * sizeof(PGXCNodeConnectionInfo));
- coord_connInfos = (PGXCNodeConnectionInfo *)
- palloc(NumCoords * sizeof(PGXCNodeConnectionInfo));
- if (coord_connInfos == NULL || datanode_connInfos == NULL)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- /* Get Host and port data for Coordinators and Datanodes */
- for (j = 0; j < 2; j++)
- {
- PGXCNodeConnectionInfo *connectionInfos;
- int num_nodes;
-
- if (j == 0)
- {
- connectionInfos = coord_connInfos;
- num_nodes = NumCoords;
- }
- else
- {
- connectionInfos = datanode_connInfos;
- num_nodes = NumDataNodes;
- }
-
- for (i = 0; i < num_nodes; i++)
- {
- connectionInfos[i].port = pq_getmsgint(s, 4);
-
- len = pq_getmsgint(s, 4);
- connectionInfos[i].host = pstrdup(pq_getmsgbytes(s, len));
- if (connectionInfos[i].host == NULL)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
- }
- }
- /* End of Getting for Datanode and Coordinator Data */
- }
- else
- {
- /* waste data*/
- s->cursor = s->len;
- }
-}
/*
* Init PoolAgent
@@ -919,7 +868,6 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
* Coordinator pool is not initialized.
* With that it would be impossible to create a Database by default.
*/
- node_info_init(s);
agent_init(agent, database, user_name);
pq_getmsgend(s);
break;
diff --git a/src/backend/pgxc/pool/poolutils.c b/src/backend/pgxc/pool/poolutils.c
index 0ee856058f..9ac7131803 100644
--- a/src/backend/pgxc/pool/poolutils.c
+++ b/src/backend/pgxc/pool/poolutils.c
@@ -21,6 +21,7 @@
#include "nodes/nodes.h"
#include "pgxc/poolmgr.h"
#include "pgxc/locator.h"
+#include "pgxc/nodemgr.h"
#include "pgxc/poolutils.h"
#include "pgxc/pgxcnode.h"
#include "access/gtm.h"
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 18cb20e293..e5e68e5f50 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -31,7 +31,7 @@
#ifdef PGXC
#include "pgxc/pgxc.h"
#include "nodes/nodes.h"
-#include "pgxc/poolmgr.h"
+#include "pgxc/nodemgr.h"
#include "optimizer/planner.h"
#endif
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a8c6175da..5be4d2597d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2227,6 +2227,14 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
continue;
if (proc == MyProc)
continue;
+#ifdef PGXC
+ /*
+ * PGXC pooler just refers to XC-specific catalogs,
+ * it does not create any consistency issues.
+ */
+ if (proc->isPooler)
+ continue;
+#endif
found = true;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 09e39dc28c..992d1b211a 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -41,6 +41,7 @@
#include "postmaster/autovacuum.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
+#include "pgxc/poolmgr.h"
#endif
#include "replication/syncrep.h"
#include "storage/ipc.h"
@@ -324,6 +325,9 @@ InitProcess(void)
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess())
MyProc->vacuumFlags |= PROC_IS_AUTOVACUUM;
+#ifdef PGXC
+ MyProc->isPooler = false;
+#endif
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
@@ -465,6 +469,11 @@ InitAuxiliaryProcess(void)
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
+#ifdef PGXC
+ MyProc->isPooler = false;
+ if (IsPGXCPoolerProcess())
+ MyProc->isPooler = true;
+#endif
MyProc->inCommit = false;
MyProc->vacuumFlags = 0;
MyProc->lwWaiting = false;
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 0b6fd19abe..d717fb72f1 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -43,6 +43,9 @@
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
+#ifdef PGXC
+#include "pgxc/poolmgr.h"
+#endif
#define DIRECTORY_LOCK_FILE "postmaster.pid"
@@ -499,7 +502,14 @@ InitializeSessionUserIdStandalone(void)
* This function should only be called in single-user mode and in
* autovacuum workers.
*/
+#ifdef PGXC
+ /* A pooler process can also go through freely */
+ AssertState(!IsUnderPostmaster ||
+ IsAutoVacuumWorkerProcess() ||
+ IsPGXCPoolerProcess());
+#else
AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess());
+#endif
/* call only once */
AssertState(!OidIsValid(AuthenticatedUserId));
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 8347f52ca8..fe61ee6645 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -464,7 +464,15 @@ void
InitPostgres(const char *in_dbname, Oid dboid, const char *username,
char *out_dbname)
{
+#ifdef PGXC
+ /*
+ * Postgres-XC pooler behaves more or less like a bootstrap process
+ * it doesn't do anything to the database and only reads XC-specific catalog data.
+ */
+ bool bootstrap = IsBootstrapProcessingMode() || IsPGXCPoolerProcess();
+#else
bool bootstrap = IsBootstrapProcessingMode();
+#endif
bool am_superuser;
char *fullpath;
char dbname[NAMEDATALEN];
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 9d194171a5..e951cf7a92 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -317,6 +317,7 @@ typedef enum ProcessingMode
extern ProcessingMode Mode;
+
#define IsBootstrapProcessingMode() ((bool)(Mode == BootstrapProcessing))
#define IsInitProcessingMode() ((bool)(Mode == InitProcessing))
#define IsNormalProcessingMode() ((bool)(Mode == NormalProcessing))
diff --git a/src/include/pgxc/nodemgr.h b/src/include/pgxc/nodemgr.h
index ea910aee85..ff00b56d1d 100644
--- a/src/include/pgxc/nodemgr.h
+++ b/src/include/pgxc/nodemgr.h
@@ -19,6 +19,14 @@
#include "nodes/parsenodes.h"
+/* Global number of nodes */
+extern int NumDataNodes;
+extern int NumCoords;
+extern int NumCoordSlaves;
+extern int NumDataNodeSlaves;
+
+extern void PgxcNodeListAndCount(Oid **coOids, Oid **dnOids,
+ Oid **coslaveOids, Oid **dnslaveOids);
extern void PgxcNodeAlter(AlterNodeStmt *stmt);
extern void PgxcNodeCreate(CreateNodeStmt *stmt);
extern void PgxcNodeRemove(DropNodeStmt *stmt);
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index ff35dfaa01..cda434d97e 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -46,13 +46,6 @@ typedef enum
POOL_CMD_GLOBAL_SET /* Global SET flag */
} PoolCommandType;
-/* TODO move? */
-typedef struct
-{
- char *host;
- int port;
-} PGXCNodeConnectionInfo;
-
/* Connection pool entry */
typedef struct
{
@@ -106,16 +99,16 @@ typedef struct
PoolPort port;
} PoolHandle;
-extern int NumDataNodes;
-extern int NumCoords;
-extern int NumCoordSlaves;
-extern int NumDataNodeSlaves;
extern int MinPoolSize;
extern int MaxPoolSize;
extern int PoolerPort;
extern bool PersistentConnections;
+/* Status inquiry functions */
+extern void PGXCPoolerProcessIam(void);
+extern bool IsPGXCPoolerProcess(void);
+
/* Initialize internal structures */
extern int PoolManagerInit(void);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index e26647caac..47023b9963 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -105,6 +105,11 @@ struct PGPROC
*/
bool recoveryConflictPending;
+#ifdef PGXC
+ /* Postgres-XC flags */
+ bool isPooler; /* true if process is Postgres-XC pooler */
+#endif
+
/* Info about LWLock the process is currently waiting for, if any. */
bool lwWaiting; /* true if waiting for an LW lock */
bool lwExclusive; /* true if waiting for exclusive access */