diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/bootstrap/bootstrap.c | 6 | ||||
| -rw-r--r-- | src/backend/commands/prepare.c | 2 | ||||
| -rw-r--r-- | src/backend/nodes/list.c | 4 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/nodemgr/nodemgr.c | 136 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 1 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 208 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 260 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolutils.c | 1 | ||||
| -rw-r--r-- | src/backend/rewrite/rewriteHandler.c | 2 | ||||
| -rw-r--r-- | src/backend/storage/ipc/procarray.c | 8 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/proc.c | 9 | ||||
| -rw-r--r-- | src/backend/utils/init/miscinit.c | 10 | ||||
| -rw-r--r-- | src/backend/utils/init/postinit.c | 8 | ||||
| -rw-r--r-- | src/include/miscadmin.h | 1 | ||||
| -rw-r--r-- | src/include/pgxc/nodemgr.h | 8 | ||||
| -rw-r--r-- | src/include/pgxc/poolmgr.h | 15 | ||||
| -rw-r--r-- | src/include/storage/proc.h | 5 |
19 files changed, 317 insertions, 371 deletions
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 40d03acbce..838ec0eebd 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -377,6 +377,12 @@ AuxiliaryProcessMain(int argc, char *argv[]) */ if (IsUnderPostmaster) { +#ifdef PGXC + /* Initialize pooler flag before creating PGPROC structure */ + if (auxType == PoolerProcess) + PGXCPoolerProcessIam(); +#endif + /* * Create a PGPROC so we can use LWLocks. In the EXEC_BACKEND case, * this was already done by SubPostmasterMain(). diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index b6ae576df8..00cfece959 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -36,7 +36,7 @@ #ifdef PGXC #include "pgxc/pgxc.h" #include "nodes/nodes.h" -#include "pgxc/poolmgr.h" +#include "pgxc/nodemgr.h" #include "pgxc/execRemote.h" #include "catalog/pgxc_node.h" #endif diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index 46f4658aed..6c4d05e36e 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -838,8 +838,8 @@ list_intersection_int(List *list1, List *list2) result = NIL; foreach(cell, list1) { - if (list_member_int(list2, lfirst(cell))) - result = lappend_int(result, lfirst(cell)); + if (list_member_int(list2, lfirst_int(cell))) + result = lappend_int(result, lfirst_int(cell)); } check_list_invariants(result); diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index e7b0e65c2f..5e7ec911fd 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -37,7 +37,7 @@ #include "utils/tqual.h" #include "utils/syscache.h" #include "nodes/nodes.h" -#include "pgxc/poolmgr.h" +#include "pgxc/nodemgr.h" #include "pgxc/locator.h" #include "pgxc/pgxc.h" #include "pgxc/pgxcnode.h" diff --git a/src/backend/pgxc/nodemgr/nodemgr.c b/src/backend/pgxc/nodemgr/nodemgr.c index 3f26c8d421..12e40a63fb 100644 --- a/src/backend/pgxc/nodemgr/nodemgr.c +++ b/src/backend/pgxc/nodemgr/nodemgr.c @@ -22,8 +22,15 @@ #include "utils/rel.h" #include "utils/syscache.h" #include "utils/lsyscache.h" +#include "utils/tqual.h" +#include "pgxc/locator.h" #include "pgxc/nodemgr.h" +/* Global number of nodes */ +int NumDataNodes = 2; +int NumCoords = 1; +int NumCoordSlaves = 0; +int NumDataNodeSlaves = 0; /* * Check list of options and return things filled @@ -96,6 +103,123 @@ check_options(List *options, DefElem **dhost, } } +/* -------------------------------- + * cmp_nodes + * + * Compare the Oids of two XC nodes + * to sort them in ascending order by their names + * -------------------------------- + */ +static int +cmp_nodes(const void *p1, const void *p2) +{ + Oid n1 = *((Oid *)p1); + Oid n2 = *((Oid *)p2); + + if (strcmp(get_pgxc_nodename(n1), get_pgxc_nodename(n2)) < 0) + return -1; + + if (strcmp(get_pgxc_nodename(n1), get_pgxc_nodename(n2)) == 0) + return 0; + + return 1; +} + +/* + * PgxcNodeCount + * + * Count number of PGXC nodes based on catalog information and return + * an ordered list of node Oids for each PGXC node type. + */ +void +PgxcNodeListAndCount(Oid **coOids, Oid **dnOids, Oid **coslaveOids, Oid **dnslaveOids) +{ + Relation rel; + HeapScanDesc scan; + HeapTuple tuple; + + /* Reinitialize counts */ + NumCoords = 0; + NumDataNodes = 0; + NumCoordSlaves = 0; + NumDataNodeSlaves = 0; + + /* + * Node information initialization is made in one scan: + * 1) Scan pgxc_node catalog to find the number of nodes for + * each node type and make proper allocations + * 2) Then extract the node Oid + * 3) Complete primary/preferred node information + */ + rel = heap_open(PgxcNodeRelationId, AccessShareLock); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + int numnodes; + Oid **nodes; + Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple); + + /* Take data for given node type */ + switch (nodeForm->node_type) + { + case PGXC_NODE_COORD_MASTER: + NumCoords++; + numnodes = NumCoords; + nodes = coOids; + break; + case PGXC_NODE_DATANODE_MASTER: + NumDataNodes++; + numnodes = NumDataNodes; + nodes = dnOids; + break; + case PGXC_NODE_COORD_SLAVE: + NumCoordSlaves++; + numnodes = NumCoordSlaves; + nodes = coslaveOids; + break; + case PGXC_NODE_DATANODE_SLAVE: + NumDataNodeSlaves++; + numnodes = NumDataNodeSlaves; + nodes = dnslaveOids; + break; + default: + break; + } + + if (numnodes == 1) + *nodes = (Oid *) palloc(numnodes * sizeof(Oid)); + else + *nodes = (Oid *) repalloc(*nodes, numnodes * sizeof(Oid)); + + (*nodes)[numnodes - 1] = get_pgxc_nodeoid(NameStr(nodeForm->node_name)); + + /* + * Save data related to preferred and primary node + * Preferred and primaries use node Oids + */ + if (nodeForm->nodeis_primary) + primary_data_node = get_pgxc_nodeoid(NameStr(nodeForm->node_name)); + if (nodeForm->nodeis_preferred) + { + preferred_data_node[num_preferred_data_nodes] = + get_pgxc_nodeoid(NameStr(nodeForm->node_name)); + num_preferred_data_nodes++; + } + } + heap_endscan(scan); + heap_close(rel, AccessShareLock); + + /* Finally sort the lists to be sent back */ + if (NumCoords != 0) + qsort(*coOids, NumCoords, sizeof(Oid), cmp_nodes); + if (NumDataNodes != 0) + qsort(*dnOids, NumDataNodes, sizeof(Oid), cmp_nodes); + if (NumDataNodeSlaves != 0) + qsort(*coslaveOids, NumCoordSlaves, sizeof(Oid), cmp_nodes); + if (NumDataNodeSlaves != 0) + qsort(*dnslaveOids, NumDataNodeSlaves, sizeof(Oid), cmp_nodes); +} + /* * PgxcNodeCreate * @@ -478,13 +602,13 @@ PgxcNodeAlter(AlterNodeStmt *stmt) if (node_port > 0) { new_record[Anum_pgxc_node_port - 1] = Int32GetDatum(node_port); - new_record_repl[Anum_pgxc_node_port - 1] = true; + new_record_repl[Anum_pgxc_node_port - 1] = true; } if (node_host) { new_record[Anum_pgxc_node_host - 1] = DirectFunctionCall1(namein, CStringGetDatum(node_host)); - new_record_repl[Anum_pgxc_node_host - 1] = true; + new_record_repl[Anum_pgxc_node_host - 1] = true; } if (drelated || node_type == PGXC_NODE_COORD_MASTER || @@ -492,22 +616,22 @@ PgxcNodeAlter(AlterNodeStmt *stmt) { /* Force update of related node to InvalidOid if node is changed to master */ new_record[Anum_pgxc_node_related - 1] = ObjectIdGetDatum(relatedOid); - new_record_repl[Anum_pgxc_node_related - 1] = true; + new_record_repl[Anum_pgxc_node_related - 1] = true; } if (node_type != PGXC_NODE_NONE) { new_record[Anum_pgxc_node_type - 1] = CharGetDatum(node_type); - new_record_repl[Anum_pgxc_node_type - 1] = true; + new_record_repl[Anum_pgxc_node_type - 1] = true; } if (is_primary) { new_record[Anum_pgxc_node_is_primary - 1] = BoolGetDatum(nodeis_primary); - new_record_repl[Anum_pgxc_node_is_primary - 1] = true; + new_record_repl[Anum_pgxc_node_is_primary - 1] = true; } if (is_preferred) { new_record[Anum_pgxc_node_is_preferred - 1] = BoolGetDatum(nodeis_preferred); - new_record_repl[Anum_pgxc_node_is_preferred - 1] = true; + new_record_repl[Anum_pgxc_node_is_preferred - 1] = true; } /* Update relation */ diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 8a979db369..3e3b6113b7 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -35,9 +35,9 @@ #include "pgxc/execRemote.h" #include "pgxc/pgxc.h" #include "pgxc/locator.h" +#include "pgxc/nodemgr.h" #include "pgxc/planner.h" #include "pgxc/postgresql_fdw.h" -#include "pgxc/poolmgr.h" #include "tcop/pquery.h" #include "utils/acl.h" #include "utils/builtins.h" diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index a6a6a0b61f..b8a34b2110 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -30,6 +30,7 @@ #include "pgxc/execRemote.h" #include "nodes/nodes.h" #include "nodes/nodeFuncs.h" +#include "pgxc/nodemgr.h" #include "pgxc/poolmgr.h" #include "storage/ipc.h" #include "utils/datum.h" diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index 67142bbd20..b90c0ba98a 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -36,14 +36,13 @@ #include "catalog/pgxc_node.h" #include "catalog/pg_collation.h" #include "pgxc/locator.h" +#include "pgxc/nodemgr.h" #include "pgxc/pgxc.h" #include "pgxc/poolmgr.h" #include "tcop/dest.h" #include "utils/builtins.h" #include "utils/elog.h" #include "utils/memutils.h" -#include "utils/snapmgr.h" -#include "utils/tqual.h" #include "utils/fmgroids.h" #include "utils/syscache.h" #include "utils/lsyscache.h" @@ -117,14 +116,8 @@ init_pgxc_handle(PGXCNodeHandle *pgxc_handle) void InitMultinodeExecutor(void) { - Relation rel; - HeapScanDesc scan; - HeapTuple tuple; int count; - int loc_co = 0; - int loc_dn = 0; - int loc_co_slave = 0; - int loc_dn_slave = 0; + Oid *coOids, *dnOids, *coslaveOids, *dnslaveOids; /* This function could get called multiple times because of sigjmp */ if (dn_handles != NULL && @@ -133,46 +126,8 @@ InitMultinodeExecutor(void) co_slave_handles != NULL) return; - /* Reinitialize counts */ - NumCoords = 0; - NumDataNodes = 0; - NumCoordSlaves = 0; - NumDataNodeSlaves = 0; - - /* - * Node information initialization is made in two phases: - * 1) Scan pgxc_node catalog to find the number of nodes for - * each node type and make proper allocations - * 2) Classify node information by alphabetical order - * and save node Oid information properly. - */ - rel = heap_open(PgxcNodeRelationId, AccessShareLock); - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple); - - /* Take data for given node type */ - switch (nodeForm->node_type) - { - case PGXC_NODE_COORD_MASTER: - NumCoords++; - break; - case PGXC_NODE_DATANODE_MASTER: - NumDataNodes++; - break; - case PGXC_NODE_COORD_SLAVE: - NumCoordSlaves++; - break; - case PGXC_NODE_DATANODE_SLAVE: - NumDataNodeSlaves++; - break; - default: - continue; - } - } - heap_endscan(scan); - heap_close(rel, AccessShareLock); + /* Get classified list of node Oids */ + PgxcNodeListAndCount(&coOids, &dnOids, &coslaveOids, &dnslaveOids); /* Do proper initialization of handles */ if (NumDataNodes > 0) @@ -198,156 +153,25 @@ InitMultinodeExecutor(void) /* Initialize new empty slots */ for (count = 0; count < NumDataNodes; count++) + { init_pgxc_handle(&dn_handles[count]); + dn_handles[count].nodeoid = dnOids[count]; + } for (count = 0; count < NumCoords; count++) + { init_pgxc_handle(&co_handles[count]); - for (count = 0; count < NumDataNodeSlaves; count++) - init_pgxc_handle(&dn_slave_handles[count]); + co_handles[count].nodeoid = coOids[count]; + } for (count = 0; count < NumCoordSlaves; count++) + { init_pgxc_handle(&co_slave_handles[count]); - - /* Now begin second phase and fill in slots with classified node information */ - rel = heap_open(PgxcNodeRelationId, AccessShareLock); - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + co_slave_handles[count].nodeoid = coslaveOids[count]; + } + for (count = 0; count < NumDataNodeSlaves; count++) { - Form_pgxc_node nodeForm = (Form_pgxc_node) GETSTRUCT(tuple); - PGXCNodeHandle *curr_nodes; - int curr_nodenum, i; - int position = 1; - - /* Take data for given node type */ - switch (nodeForm->node_type) - { - case PGXC_NODE_COORD_MASTER: - curr_nodes = co_handles; - curr_nodenum = loc_co; - break; - case PGXC_NODE_DATANODE_MASTER: - curr_nodes = dn_handles; - curr_nodenum = loc_dn; - break; - case PGXC_NODE_COORD_SLAVE: - curr_nodes = co_slave_handles; - curr_nodenum = loc_co_slave; - break; - case PGXC_NODE_DATANODE_SLAVE: - curr_nodes = dn_slave_handles; - curr_nodenum = loc_dn_slave; - break; - default: - continue; - } - - /* - * Classify by alphabetical order current array. - * Find at which position current node should be placed. - */ - if (curr_nodenum == 1) - { - /* Special case when only one node is present */ - int res = strcmp(NameStr(nodeForm->node_name), - get_pgxc_nodename(curr_nodes[0].nodeoid)); - if (res < 0) - position = 0; - else - position = 1; - } - else if (curr_nodenum > 1) - { - /* Case with more than 2 nodes in current array */ - for (i = 0; i < curr_nodenum - 1; i++) - { - /* New slot is first? */ - if (i == 0 && - strcmp(NameStr(nodeForm->node_name), - get_pgxc_nodename(curr_nodes[i].nodeoid)) < 0) - position = 0; - - /* Intermediate case */ - if (strcmp(NameStr(nodeForm->node_name), - get_pgxc_nodename(curr_nodes[i].nodeoid)) > 0 && - strcmp(NameStr(nodeForm->node_name), - get_pgxc_nodename(curr_nodes[i + 1].nodeoid)) < 0) - { - position = i + 1; - break; - } - - /* New slot is last? */ - if (i == curr_nodenum - 2 && - strcmp(NameStr(nodeForm->node_name), - get_pgxc_nodename(curr_nodes[i + 1].nodeoid)) > 0) - position = i + 2; - } - } - /* Increment node count */ - curr_nodenum++; - - /* Rebuild current array */ - if (curr_nodenum == 1) - { - /* All slots are empty, fill in first one */ - curr_nodes[0].nodeoid = get_pgxc_nodeoid(NameStr(nodeForm->node_name)); - } - else - { - /* - * Move slots at the end of array to the right to let place - * for the new slot entry. - * Nothing should be done if position is the last one. - */ - if (position != curr_nodenum - 1) - { - for (i = curr_nodenum - 2; i > position - 1; i--) - { - /* Move intermediate slot data */ - curr_nodes[i + 1].nodeoid = curr_nodes[i].nodeoid; - } - } - /* Fill in new slot */ - curr_nodes[position].nodeoid = - get_pgxc_nodeoid(NameStr(nodeForm->node_name)); - } - - /* - * Save data related to preferred and primary node - * Preferred and primaries use node Oids - */ - if (nodeForm->nodeis_primary) - primary_data_node = get_pgxc_nodeoid(NameStr(nodeForm->node_name)); - if (nodeForm->nodeis_preferred) - { - preferred_data_node[num_preferred_data_nodes] = - get_pgxc_nodeoid(NameStr(nodeForm->node_name)); - num_preferred_data_nodes++; - } - - /* Save new data */ - switch (nodeForm->node_type) - { - case PGXC_NODE_COORD_MASTER: - co_handles = curr_nodes; - loc_co = curr_nodenum; - break; - case PGXC_NODE_DATANODE_MASTER: - dn_handles = curr_nodes; - loc_dn = curr_nodenum; - break; - case PGXC_NODE_COORD_SLAVE: - co_slave_handles = curr_nodes; - loc_co_slave = curr_nodenum; - break; - case PGXC_NODE_DATANODE_SLAVE: - dn_slave_handles = curr_nodes; - loc_dn_slave = curr_nodenum; - break; - default: - continue; - } + init_pgxc_handle(&dn_slave_handles[count]); + dn_slave_handles[count].nodeoid = dnslaveOids[count]; } - heap_endscan(scan); - heap_close(rel, AccessShareLock); datanode_count = 0; coord_count = 0; diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index bc28389d48..bd29ece2fc 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -44,10 +44,12 @@ #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/lsyscache.h" +#include "utils/resowner.h" #include "lib/stringinfo.h" #include "libpq/pqformat.h" #include "pgxc/locator.h" #include "pgxc/pgxc.h" +#include "pgxc/nodemgr.h" #include "pgxc/poolutils.h" #include "../interfaces/libpq/libpq-fe.h" #include "../interfaces/libpq/libpq-int.h" @@ -58,22 +60,31 @@ #include <sys/socket.h> /* Configuration options */ -int NumDataNodes = 2; -int NumCoords = 1; -int NumCoordSlaves = 0; -int NumDataNodeSlaves = 0; int MinPoolSize = 1; int MaxPoolSize = 100; int PoolerPort = 6667; bool PersistentConnections = false; +/* Flag to tell if we are Postgres-XC pooler process */ +static bool am_pgxc_pooler = false; + +/* Connection information cached */ +typedef struct +{ + Oid nodeoid; + char *host; + int port; +} PGXCNodeConnectionInfo; + /* The memory context */ static MemoryContext PoolerMemoryContext = NULL; /* PGXC Nodes info list */ static PGXCNodeConnectionInfo *datanode_connInfos; static PGXCNodeConnectionInfo *coord_connInfos; +static PGXCNodeConnectionInfo *datanode_slave_connInfos; +static PGXCNodeConnectionInfo *coord_slave_connInfos; /* Pool to all the databases (linked list) */ static DatabasePool *databasePools = NULL; @@ -87,7 +98,7 @@ static PoolHandle *poolHandle = NULL; static int is_pool_cleaning = false; static int server_fd = -1; -static void node_info_init(StringInfo s); +static void node_info_load(void); static void agent_init(PoolAgent *agent, const char *database, const char *user_name); static void agent_destroy(PoolAgent *agent); static void agent_create(void); @@ -137,6 +148,17 @@ static void pooler_quickdie(SIGNAL_ARGS); */ static volatile sig_atomic_t shutdown_requested = false; +void +PGXCPoolerProcessIam(void) +{ + am_pgxc_pooler = true; +} + +bool +IsPGXCPoolerProcess(void) +{ + return am_pgxc_pooler; +} /* * Initialize internal structures @@ -144,8 +166,6 @@ static volatile sig_atomic_t shutdown_requested = false; int PoolManagerInit() { - MemoryContext old_context; - elog(DEBUG1, "Pooler process is started: %d", getpid()); /* @@ -189,7 +209,7 @@ PoolManagerInit() PG_SETMASK(&UnBlockSig); /* Allocate pooler structures in the Pooler context */ - old_context = MemoryContextSwitchTo(PoolerMemoryContext); + MemoryContextSwitchTo(PoolerMemoryContext); poolAgents = (PoolAgent **) palloc(MaxConnections * sizeof(PoolAgent *)); if (poolAgents == NULL) @@ -199,10 +219,84 @@ PoolManagerInit() errmsg("out of memory"))); } + /* Initialize process ressources */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "ForPoolerInfo"); + + /* Initialize pooler in Postgres-way */ + InitPostgres(NULL, InvalidOid, NULL, NULL); + + /* Initialize pooler connection info */ + node_info_load(); + PoolerLoop(); return 0; } +/* + * Load node info cached by scanning PGXC node catalog + */ +static void +node_info_load(void) +{ + int count; + Oid *coOids = NULL; + Oid *dnOids = NULL; + Oid *coslaveOids = NULL; + Oid *dnslaveOids = NULL; + + /* Update number of PGXC nodes saved in cache */ + PgxcNodeListAndCount(&coOids, &dnOids, &coslaveOids, &dnslaveOids); + + /* Then initialize the node informations */ + if (NumDataNodes != 0) + datanode_connInfos = (PGXCNodeConnectionInfo *) + palloc(NumDataNodes * sizeof(PGXCNodeConnectionInfo)); + if (NumCoords != 0) + coord_connInfos = (PGXCNodeConnectionInfo *) + palloc(NumCoords * sizeof(PGXCNodeConnectionInfo)); + if (NumCoordSlaves != 0) + coord_slave_connInfos = (PGXCNodeConnectionInfo *) + palloc(NumCoordSlaves * sizeof(PGXCNodeConnectionInfo)); + if (NumDataNodeSlaves != 0) + coord_slave_connInfos = (PGXCNodeConnectionInfo *) + palloc(NumDataNodeSlaves * sizeof(PGXCNodeConnectionInfo)); + + /* Fill in connection info structures */ + for (count = 0; count < NumCoords; count++) + { + coord_connInfos[count].nodeoid = coOids[count]; + coord_connInfos[count].port = get_pgxc_nodeport(coOids[count]); + coord_connInfos[count].host = get_pgxc_nodehost(coOids[count]); + } + for (count = 0; count < NumDataNodes; count++) + { + datanode_connInfos[count].nodeoid = dnOids[count]; + datanode_connInfos[count].port = get_pgxc_nodeport(dnOids[count]); + datanode_connInfos[count].host = get_pgxc_nodehost(dnOids[count]); + } + for (count = 0; count < NumCoordSlaves; count++) + { + coord_slave_connInfos[count].nodeoid = coslaveOids[count]; + coord_slave_connInfos[count].port = get_pgxc_nodeport(coslaveOids[count]); + coord_slave_connInfos[count].host = get_pgxc_nodehost(coslaveOids[count]); + } + for (count = 0; count < NumDataNodeSlaves; count++) + { + datanode_slave_connInfos[count].nodeoid = dnOids[count]; + datanode_slave_connInfos[count].port = get_pgxc_nodeport(dnslaveOids[count]); + datanode_slave_connInfos[count].host = get_pgxc_nodehost(dnslaveOids[count]); + } + + /* Clean up resources */ + if (coOids) + pfree(coOids); + if (dnOids) + pfree(dnOids); + if (coslaveOids) + pfree(coslaveOids); + if (dnslaveOids) + pfree(dnslaveOids); +} /* * Destroy internal structures @@ -340,9 +434,8 @@ agent_create(void) void PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_name) { - int n32, i, j; + int n32; char msgtype = 'c'; - int msg_len; Assert(handle); Assert(database); @@ -355,43 +448,7 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na pool_putbytes(&handle->port, &msgtype, 1); /* Message length */ - msg_len = 4 + /* length itself */ - 4 + /* PID number */ - 4 + /* length of database name */ - strlen(database) + 1 + - 4 + /* length of user name */ - strlen(user_name) + 1 + - 4 + /* number of data nodes */ - 4 + /* number of coordinators */ - (NumDataNodes * 4) + /* port for each data node */ - (NumCoords * 4) + /* port for each coordinator */ - (NumDataNodes * 4) + /* host name length for each data node */ - (NumCoords * 4); /* host name length for each coordinator */ - - /* Length of host names needs to be added to message length */ - for (j = 0; j < 2; j++) - { - int nodenum; - char nodetype; - if (j == 0) - { - nodenum = NumCoords; - nodetype = PGXC_NODE_COORD_MASTER; - } - else - { - nodenum = NumDataNodes; - nodetype = PGXC_NODE_DATANODE_MASTER; - } - - for (i = 0; i < nodenum; i++) - { - Oid nodeoid = PGXCNodeGetNodeOid(i + 1, nodetype); - msg_len += strlen(get_pgxc_nodehost(nodeoid)) + 1; - } - } - - n32 = htonl(msg_len); + n32 = htonl(strlen(database) + strlen(user_name) + 18); pool_putbytes(&handle->port, (char *) &n32, 4); /* PID number */ @@ -413,51 +470,6 @@ PoolManagerConnect(PoolHandle *handle, const char *database, const char *user_na /* Send user name followed by \0 terminator */ pool_putbytes(&handle->port, user_name, strlen(user_name) + 1); pool_flush(&handle->port); - - /* Send number of data nodes */ - n32 = htonl(NumDataNodes); - pool_putbytes(&handle->port, (char *) &n32, 4); - - /* Send number of coordinators */ - n32 = htonl(NumCoords); - pool_putbytes(&handle->port, (char *) &n32, 4); - - for (j = 0; j < 2; j++) - { - int nodenum; - char nodetype; - if (j == 0) - { - nodenum = NumCoords; - nodetype = PGXC_NODE_COORD_MASTER; - } - else - { - nodenum = NumDataNodes; - nodetype = PGXC_NODE_DATANODE_MASTER; - } - - /* Send ports and hosts */ - for (i = 0; i < nodenum; i++) - { - Oid nodeoid = PGXCNodeGetNodeOid(i + 1, nodetype); - int port_num = get_pgxc_nodeport(nodeoid); - char *nodehost = get_pgxc_nodehost(nodeoid); - - /* send port */ - port_num = htonl(port_num); - pool_putbytes(&handle->port, (char *) &port_num, 4); - - /* Length of host info */ - n32 = htonl(strlen(nodehost) + 1); - pool_putbytes(&handle->port, (char *) &n32, 4); - - /* Send host info followed by \0 terminator */ - pool_putbytes(&handle->port, nodehost, strlen(nodehost) + 1); - pool_flush(&handle->port); - } - } - pool_flush(&handle->port); } int @@ -507,69 +519,6 @@ PoolManagerSetCommand(PoolCommandType command_type, const char *set_command) return res; } -/* - * Use incoming message to set up node information cached in pooler - */ -static void -node_info_init(StringInfo s) -{ - int i, j, len; - - if (coord_connInfos == NULL) - { - NumDataNodes = pq_getmsgint(s, 4); - NumCoords = pq_getmsgint(s, 4); - - datanode_connInfos = (PGXCNodeConnectionInfo *) - palloc(NumDataNodes * sizeof(PGXCNodeConnectionInfo)); - coord_connInfos = (PGXCNodeConnectionInfo *) - palloc(NumCoords * sizeof(PGXCNodeConnectionInfo)); - if (coord_connInfos == NULL || datanode_connInfos == NULL) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - /* Get Host and port data for Coordinators and Datanodes */ - for (j = 0; j < 2; j++) - { - PGXCNodeConnectionInfo *connectionInfos; - int num_nodes; - - if (j == 0) - { - connectionInfos = coord_connInfos; - num_nodes = NumCoords; - } - else - { - connectionInfos = datanode_connInfos; - num_nodes = NumDataNodes; - } - - for (i = 0; i < num_nodes; i++) - { - connectionInfos[i].port = pq_getmsgint(s, 4); - - len = pq_getmsgint(s, 4); - connectionInfos[i].host = pstrdup(pq_getmsgbytes(s, len)); - if (connectionInfos[i].host == NULL) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - } - } - /* End of Getting for Datanode and Coordinator Data */ - } - else - { - /* waste data*/ - s->cursor = s->len; - } -} /* * Init PoolAgent @@ -919,7 +868,6 @@ agent_handle_input(PoolAgent * agent, StringInfo s) * Coordinator pool is not initialized. * With that it would be impossible to create a Database by default. */ - node_info_init(s); agent_init(agent, database, user_name); pq_getmsgend(s); break; diff --git a/src/backend/pgxc/pool/poolutils.c b/src/backend/pgxc/pool/poolutils.c index 0ee856058f..9ac7131803 100644 --- a/src/backend/pgxc/pool/poolutils.c +++ b/src/backend/pgxc/pool/poolutils.c @@ -21,6 +21,7 @@ #include "nodes/nodes.h" #include "pgxc/poolmgr.h" #include "pgxc/locator.h" +#include "pgxc/nodemgr.h" #include "pgxc/poolutils.h" #include "pgxc/pgxcnode.h" #include "access/gtm.h" diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 18cb20e293..e5e68e5f50 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -31,7 +31,7 @@ #ifdef PGXC #include "pgxc/pgxc.h" #include "nodes/nodes.h" -#include "pgxc/poolmgr.h" +#include "pgxc/nodemgr.h" #include "optimizer/planner.h" #endif diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 8a8c6175da..5be4d2597d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2227,6 +2227,14 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) continue; if (proc == MyProc) continue; +#ifdef PGXC + /* + * PGXC pooler just refers to XC-specific catalogs, + * it does not create any consistency issues. + */ + if (proc->isPooler) + continue; +#endif found = true; diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 09e39dc28c..992d1b211a 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -41,6 +41,7 @@ #include "postmaster/autovacuum.h" #ifdef PGXC #include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" #endif #include "replication/syncrep.h" #include "storage/ipc.h" @@ -324,6 +325,9 @@ InitProcess(void) /* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */ if (IsAutoVacuumWorkerProcess()) MyProc->vacuumFlags |= PROC_IS_AUTOVACUUM; +#ifdef PGXC + MyProc->isPooler = false; +#endif MyProc->lwWaiting = false; MyProc->lwExclusive = false; MyProc->lwWaitLink = NULL; @@ -465,6 +469,11 @@ InitAuxiliaryProcess(void) MyProc->backendId = InvalidBackendId; MyProc->databaseId = InvalidOid; MyProc->roleId = InvalidOid; +#ifdef PGXC + MyProc->isPooler = false; + if (IsPGXCPoolerProcess()) + MyProc->isPooler = true; +#endif MyProc->inCommit = false; MyProc->vacuumFlags = 0; MyProc->lwWaiting = false; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 0b6fd19abe..d717fb72f1 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -43,6 +43,9 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/syscache.h" +#ifdef PGXC +#include "pgxc/poolmgr.h" +#endif #define DIRECTORY_LOCK_FILE "postmaster.pid" @@ -499,7 +502,14 @@ InitializeSessionUserIdStandalone(void) * This function should only be called in single-user mode and in * autovacuum workers. */ +#ifdef PGXC + /* A pooler process can also go through freely */ + AssertState(!IsUnderPostmaster || + IsAutoVacuumWorkerProcess() || + IsPGXCPoolerProcess()); +#else AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess()); +#endif /* call only once */ AssertState(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 8347f52ca8..fe61ee6645 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -464,7 +464,15 @@ void InitPostgres(const char *in_dbname, Oid dboid, const char *username, char *out_dbname) { +#ifdef PGXC + /* + * Postgres-XC pooler behaves more or less like a bootstrap process + * it doesn't do anything to the database and only reads XC-specific catalog data. + */ + bool bootstrap = IsBootstrapProcessingMode() || IsPGXCPoolerProcess(); +#else bool bootstrap = IsBootstrapProcessingMode(); +#endif bool am_superuser; char *fullpath; char dbname[NAMEDATALEN]; diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 9d194171a5..e951cf7a92 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -317,6 +317,7 @@ typedef enum ProcessingMode extern ProcessingMode Mode; + #define IsBootstrapProcessingMode() ((bool)(Mode == BootstrapProcessing)) #define IsInitProcessingMode() ((bool)(Mode == InitProcessing)) #define IsNormalProcessingMode() ((bool)(Mode == NormalProcessing)) diff --git a/src/include/pgxc/nodemgr.h b/src/include/pgxc/nodemgr.h index ea910aee85..ff00b56d1d 100644 --- a/src/include/pgxc/nodemgr.h +++ b/src/include/pgxc/nodemgr.h @@ -19,6 +19,14 @@ #include "nodes/parsenodes.h" +/* Global number of nodes */ +extern int NumDataNodes; +extern int NumCoords; +extern int NumCoordSlaves; +extern int NumDataNodeSlaves; + +extern void PgxcNodeListAndCount(Oid **coOids, Oid **dnOids, + Oid **coslaveOids, Oid **dnslaveOids); extern void PgxcNodeAlter(AlterNodeStmt *stmt); extern void PgxcNodeCreate(CreateNodeStmt *stmt); extern void PgxcNodeRemove(DropNodeStmt *stmt); diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index ff35dfaa01..cda434d97e 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -46,13 +46,6 @@ typedef enum POOL_CMD_GLOBAL_SET /* Global SET flag */ } PoolCommandType; -/* TODO move? */ -typedef struct -{ - char *host; - int port; -} PGXCNodeConnectionInfo; - /* Connection pool entry */ typedef struct { @@ -106,16 +99,16 @@ typedef struct PoolPort port; } PoolHandle; -extern int NumDataNodes; -extern int NumCoords; -extern int NumCoordSlaves; -extern int NumDataNodeSlaves; extern int MinPoolSize; extern int MaxPoolSize; extern int PoolerPort; extern bool PersistentConnections; +/* Status inquiry functions */ +extern void PGXCPoolerProcessIam(void); +extern bool IsPGXCPoolerProcess(void); + /* Initialize internal structures */ extern int PoolManagerInit(void); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index e26647caac..47023b9963 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -105,6 +105,11 @@ struct PGPROC */ bool recoveryConflictPending; +#ifdef PGXC + /* Postgres-XC flags */ + bool isPooler; /* true if process is Postgres-XC pooler */ +#endif + /* Info about LWLock the process is currently waiting for, if any. */ bool lwWaiting; /* true if waiting for an LW lock */ bool lwExclusive; /* true if waiting for exclusive access */ |
