summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael P2010-10-13 01:45:16 +0000
committerPavan Deolasee2011-05-19 16:45:18 +0000
commit1a9cb6f13394e9f5e8648f1cb24fac211956b3ce (patch)
tree7dc7d3f81b75618160bc6888d4854b404c563c06 /src
parent9b94deb70ede6c7860b471e223e125b873289922 (diff)
Added support for two new pieces of functionality.
1) Support for DDL and utility command synchronisation among Coordinators. DDL is now synchronized amongst multiple coordinators. Previously, after DDL it was required to use an extra utility to resync the nodes and restart other Coordinators. This is no longer necessary. DDL support works also with common BEGIN, COMMIT and ROLLBACK instructions in the cluster. DDL may be initiated at any node. Each Coordinator can connect to any other one. Just as Coordinators use pools for connecting to Data Nodes, Coordinators now use pools for connecting to the other Coordinators. 2) Support for PREPARE TRANSACTION and COMMIT TRANSACTION, ROLLBACK PREPARED. When a transaction is prepared or committed, based on the SQL, it will only execute on the involved nodes, including DDL on Coordinators. GTM is used track which xid and nodes are involved in the transaction, identified by the user or application specified transaction identifier, when it is prepared. New GUCs -------- There are some new GUCs for handling Coordinator communication num_coordinators coordinator_hosts coordinator_ports coordinator_users coordinator_passwords In addition, a new GUC replaces coordinator_id: pgxc_node_id Open Issues ----------- Implicit two phase commit (client in autocommit mode, but distributed transaction required because of multiple nodes) does not first prepare on the originating coordinator before committing, if DDL is involved. We really should prepare here before committing on all nodes. We also need to add a bit of special handling for COMMIT PREPARED. If there is an error, and it got committed on some nodes, we still should force it to be committed on the originating coordinator, if involved, and still return an error of some sort that it was partially committed. (When the downed node recovers, in the future it will determine if any other node has committed the transaction, and if so, it, too, must commit.) It is a pretty rare case, but we should handle it. With this current configuration, DDL will fail if at least one Coordinator is down. In the future, we will make this more flexible. Written by Michael Paquier
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/gtm.c8
-rw-r--r--src/backend/access/transam/twophase.c24
-rw-r--r--src/backend/access/transam/varsup.c24
-rw-r--r--src/backend/access/transam/xact.c134
-rw-r--r--src/backend/catalog/dependency.c12
-rw-r--r--src/backend/commands/copy.c2
-rw-r--r--src/backend/commands/sequence.c6
-rw-r--r--src/backend/commands/tablecmds.c6
-rw-r--r--src/backend/executor/execMain.c4
-rw-r--r--src/backend/optimizer/plan/planner.c6
-rw-r--r--src/backend/parser/parse_utilcmd.c4
-rw-r--r--src/backend/pgxc/locator/locator.c37
-rw-r--r--src/backend/pgxc/pool/Makefile2
-rw-r--r--src/backend/pgxc/pool/execRemote.c1108
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c (renamed from src/backend/pgxc/pool/datanode.c)609
-rw-r--r--src/backend/pgxc/pool/poolmgr.c988
-rw-r--r--src/backend/postmaster/postmaster.c5
-rw-r--r--src/backend/storage/ipc/procarray.c3
-rw-r--r--src/backend/tcop/postgres.c28
-rw-r--r--src/backend/tcop/utility.c231
-rw-r--r--src/backend/utils/misc/guc.c81
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample16
-rw-r--r--src/backend/utils/sort/tuplesort.c4
-rw-r--r--src/bin/pg_dump/pg_dump.h4
-rw-r--r--src/gtm/client/fe-protocol.c25
-rw-r--r--src/gtm/client/gtm_client.c17
-rw-r--r--src/gtm/main/gtm_txn.c53
-rw-r--r--src/gtm/main/main.c6
-rw-r--r--src/gtm/proxy/proxy_main.c6
-rw-r--r--src/include/access/gtm.h4
-rw-r--r--src/include/access/twophase.h8
-rw-r--r--src/include/access/xact.h4
-rw-r--r--src/include/gtm/gtm_client.h4
-rw-r--r--src/include/gtm/gtm_msg.h4
-rw-r--r--src/include/gtm/gtm_txn.h6
-rw-r--r--src/include/libpq/libpq-be.h1
-rw-r--r--src/include/pgxc/datanode.h110
-rw-r--r--src/include/pgxc/execRemote.h29
-rw-r--r--src/include/pgxc/locator.h3
-rw-r--r--src/include/pgxc/pgxc.h18
-rw-r--r--src/include/pgxc/pgxcnode.h124
-rw-r--r--src/include/pgxc/planner.h24
-rw-r--r--src/include/pgxc/poolmgr.h30
-rw-r--r--src/include/utils/guc_tables.h3
-rw-r--r--src/include/utils/timestamp.h4
45 files changed, 2580 insertions, 1249 deletions
diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c
index 08ed2c9299..64437e7210 100644
--- a/src/backend/access/transam/gtm.c
+++ b/src/backend/access/transam/gtm.c
@@ -20,7 +20,7 @@
/* Configuration variables */
char *GtmHost = "localhost";
int GtmPort = 6666;
-int GtmCoordinatorId = 1;
+int PGXCNodeId = 1;
extern bool FirstSnapshotSet;
@@ -42,7 +42,7 @@ InitGTM()
/* 256 bytes should be enough */
char conn_str[256];
- sprintf(conn_str, "host=%s port=%d coordinator_id=%d", GtmHost, GtmPort, GtmCoordinatorId);
+ sprintf(conn_str, "host=%s port=%d coordinator_id=%d", GtmHost, GtmPort, PGXCNodeId);
conn = PQconnectGTM(conn_str);
if (GTMPQstatus(conn) != CONNECTION_OK)
@@ -187,7 +187,7 @@ RollbackTranGTM(GlobalTransactionId gxid)
}
int
-BeingPreparedTranGTM(GlobalTransactionId gxid,
+StartPreparedTranGTM(GlobalTransactionId gxid,
char *gid,
int datanodecnt,
PGXC_NodeId datanodes[],
@@ -200,7 +200,7 @@ BeingPreparedTranGTM(GlobalTransactionId gxid,
return 0;
CheckConnection();
- ret = being_prepared_transaction(conn, gxid, gid, datanodecnt, datanodes, coordcnt, coordinators);
+ ret = start_prepared_transaction(conn, gxid, gid, datanodecnt, datanodes, coordcnt, coordinators);
/*
* If something went wrong (timeout), try and reset GTM connection.
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 905a515212..8821e15b28 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -861,8 +861,13 @@ StartPrepare(GlobalTransaction gxact)
*
* Calculates CRC and writes state file to WAL and in pg_twophase directory.
*/
+#ifdef PGXC
+void
+EndPrepare(GlobalTransaction gxact, bool write_2pc_file)
+#else
void
EndPrepare(GlobalTransaction gxact)
+#endif
{
TransactionId xid = gxact->proc.xid;
TwoPhaseFileHeader *hdr;
@@ -898,9 +903,10 @@ EndPrepare(GlobalTransaction gxact)
* critical section, though, it doesn't matter since any failure causes
* PANIC anyway.
*/
+
#ifdef PGXC
- /* Do not write 2PC state file on Coordinator side */
- if (IS_PGXC_DATANODE)
+ /* Write 2PC state file on Coordinator side if a DDL is involved in transaction */
+ if (write_2pc_file)
{
#endif
TwoPhaseFilePath(path, xid);
@@ -978,6 +984,7 @@ EndPrepare(GlobalTransaction gxact)
#ifdef PGXC
}
#endif
+
START_CRIT_SECTION();
MyProc->inCommit = true;
@@ -989,8 +996,11 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */
#ifdef PGXC
- /* Just write 2PC state file on Datanodes */
- if (IS_PGXC_DATANODE)
+ /*
+ * Just write 2PC state file on Datanodes
+ * or on Coordinators if DDL queries are involved.
+ */
+ if (write_2pc_file)
{
#endif
@@ -1007,6 +1017,7 @@ EndPrepare(GlobalTransaction gxact)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close two-phase state file: %m")));
+
#ifdef PGXC
}
#endif
@@ -1862,15 +1873,16 @@ RecordTransactionAbortPrepared(TransactionId xid,
END_CRIT_SECTION();
}
+
#ifdef PGXC
/*
* Remove a gxact on a Coordinator,
* this is used to be able to prepare a commit transaction on another coordinator than the one
- * who prepared the transaction
+ * who prepared the transaction, for a transaction that does not include DDLs
*/
void
RemoveGXactCoord(GlobalTransaction gxact)
{
- RemoveGXact(gxact);
+ RemoveGXact(gxact);
}
#endif
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 8145fac713..e55c02e0a6 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -22,7 +22,7 @@
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "utils/builtins.h"
-#ifdef PGXC
+#ifdef PGXC
#include "pgxc/pgxc.h"
#include "access/gtm.h"
#endif
@@ -99,25 +99,27 @@ GetNewTransactionId(bool isSubXact)
return BootstrapTransactionId;
}
-#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /* Get XID from GTM before acquiring the lock.
+ /*
+ * Get XID from GTM before acquiring the lock.
* The rest of the code will handle it if after obtaining XIDs,
* the lock is acquired in a different order.
* This will help with GTM connection issues- we will not
* block all other processes.
+ * GXID can just be obtained from a remote Coordinator
*/
xid = (TransactionId) BeginTranGTM(timestamp);
- *timestamp_received = true;
+ *timestamp_received = true;
}
-
#endif
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+#ifdef PGXC
+ /* Only remote Coordinator can go a GXID */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
if (TransactionIdIsValid(xid))
{
@@ -140,7 +142,8 @@ GetNewTransactionId(bool isSubXact)
LWLockRelease(XidGenLock);
return xid;
}
- } else if(IS_PGXC_DATANODE)
+ }
+ else if(IS_PGXC_DATANODE || IsConnFromCoord())
{
if (IsAutoVacuumWorkerProcess())
{
@@ -159,7 +162,8 @@ GetNewTransactionId(bool isSubXact)
/* try and get gxid directly from GTM */
next_xid = (TransactionId) BeginTranGTM(NULL);
}
- } else if (GetForceXidFromGTM())
+ }
+ else if (GetForceXidFromGTM())
{
elog (DEBUG1, "Force get XID from GTM");
/* try and get gxid directly from GTM */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9eac6abf5a..9e098146d8 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -26,7 +26,7 @@
#include "access/gtm.h"
/* PGXC_COORD */
#include "gtm/gtm_c.h"
-#include "pgxc/datanode.h"
+#include "pgxc/pgxcnode.h"
/* PGXC_DATANODE */
#include "postmaster/autovacuum.h"
#endif
@@ -116,7 +116,10 @@ typedef enum TBlockState
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */
TBLOCK_PREPARE, /* live xact, PREPARE received */
-
+#ifdef PGXC
+ TBLOCK_PREPARE_NO_2PC_FILE, /* PREPARE receive but skip 2PC file creation
+ * and Commit gxact */
+#endif
/* subtransaction states */
TBLOCK_SUBBEGIN, /* starting a subtransaction */
TBLOCK_SUBINPROGRESS, /* live subtransaction */
@@ -334,7 +337,7 @@ static GlobalTransactionId
GetGlobalTransactionId(TransactionState s)
{
GTM_Timestamp gtm_timestamp;
- bool received_tp;
+ bool received_tp = false;
/*
* Here we receive timestamp at the same time as gxid.
@@ -495,7 +498,7 @@ AssignTransactionId(TransactionState s)
* the Xid as "running". See GetNewTransactionId.
*/
#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
s->transactionId = (TransactionId) GetGlobalTransactionId(s);
elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s",
@@ -1629,7 +1632,8 @@ StartTransaction(void)
*/
s->state = TRANS_START;
#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
+ /* GXID is assigned already by a remote Coordinator */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
s->globalTransactionId = InvalidGlobalTransactionId; /* until assigned */
#endif
s->transactionId = InvalidTransactionId; /* until assigned */
@@ -1797,7 +1801,7 @@ CommitTransaction(void)
* There can be error on the data nodes. So go to data nodes before
* changing transaction state and local clean up
*/
- DataNodeCommit();
+ PGXCNodeCommit();
#endif
/* Prevent cancel/die interrupt while cleaning up */
@@ -1818,14 +1822,15 @@ CommitTransaction(void)
#ifdef PGXC
/*
- * Now we can let GTM know about transaction commit
+ * Now we can let GTM know about transaction commit.
+ * Only a Remote Coordinator is allowed to do that.
*/
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
CommitTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
- else if (IS_PGXC_DATANODE)
+ else if (IS_PGXC_DATANODE || IsConnFromCoord())
{
/* If we are autovacuum, commit on GTM */
if ((IsAutoVacuumWorkerProcess() || GetForceXidFromGTM())
@@ -1930,9 +1935,9 @@ CommitTransaction(void)
s->maxChildXids = 0;
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
s->globalTransactionId = InvalidGlobalTransactionId;
- else if (IS_PGXC_DATANODE)
+ else if (IS_PGXC_DATANODE || IsConnFromCoord())
SetNextTransactionId(InvalidTransactionId);
#endif
@@ -1951,8 +1956,17 @@ CommitTransaction(void)
*
* NB: if you change this routine, better look at CommitTransaction too!
*/
+#ifdef PGXC
+/*
+ * Only a Postgres-XC Coordinator that received a PREPARE Command from
+ * an application can use this special prepare.
+ */
+static void
+PrepareTransaction(bool write_2pc_file)
+#else
static void
PrepareTransaction(void)
+#endif
{
TransactionState s = CurrentTransactionState;
TransactionId xid = GetCurrentTransactionId();
@@ -2083,7 +2097,7 @@ PrepareTransaction(void)
* updates, because the transaction manager might get confused if we lose
* a global transaction.
*/
- EndPrepare(gxact);
+ EndPrepare(gxact, write_2pc_file);
/*
* Now we clean up backend-internal state and release internal resources.
@@ -2137,7 +2151,7 @@ PrepareTransaction(void)
* We want to be able to commit a prepared transaction from another coordinator,
* so clean up the gxact in shared memory also.
*/
- if (IS_PGXC_COORDINATOR)
+ if (!write_2pc_file)
{
RemoveGXactCoord(gxact);
}
@@ -2182,7 +2196,7 @@ PrepareTransaction(void)
s->maxChildXids = 0;
#ifdef PGXC /* PGXC_DATANODE */
- if (IS_PGXC_DATANODE)
+ if (IS_PGXC_DATANODE || IsConnFromCoord())
SetNextTransactionId(InvalidTransactionId);
#endif
/*
@@ -2272,16 +2286,18 @@ AbortTransaction(void)
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ /* This is done by remote Coordinator */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
- /* Make sure this is rolled back on the DataNodes,
- * if so it will just return
+ /*
+ * Make sure this is rolled back on the DataNodes
+ * if so it will just return
*/
- DataNodeRollback();
+ PGXCNodeRollback();
RollbackTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
- else if (IS_PGXC_DATANODE)
+ else if (IS_PGXC_DATANODE || IsConnFromCoord())
{
/* If we are autovacuum, commit on GTM */
if ((IsAutoVacuumWorkerProcess() || GetForceXidFromGTM())
@@ -2374,9 +2390,9 @@ CleanupTransaction(void)
s->maxChildXids = 0;
#ifdef PGXC /* PGXC_DATANODE */
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
s->globalTransactionId = InvalidGlobalTransactionId;
- else if (IS_PGXC_DATANODE)
+ else if (IS_PGXC_DATANODE || IsConnFromCoord())
SetNextTransactionId(InvalidTransactionId);
#endif
@@ -2442,6 +2458,9 @@ StartTransactionCommand(void)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(ERROR, "StartTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2548,9 +2567,20 @@ CommitTransactionCommand(void)
* return to the idle state.
*/
case TBLOCK_PREPARE:
- PrepareTransaction();
+ PrepareTransaction(true);
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+
+#ifdef PGXC
+ /*
+ * We are complieting a PREPARE TRANSACTION for a pgxc transaction
+ * that involved DDLs on a Coordinator.
+ */
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+ PrepareTransaction(false);
s->blockState = TBLOCK_DEFAULT;
break;
+#endif
/*
* We were just issued a SAVEPOINT inside a transaction block.
@@ -2582,10 +2612,15 @@ CommitTransactionCommand(void)
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
}
+#ifdef PGXC
+ else if (s->blockState == TBLOCK_PREPARE ||
+ s->blockState == TBLOCK_PREPARE_NO_2PC_FILE)
+#else
else if (s->blockState == TBLOCK_PREPARE)
+#endif
{
Assert(s->parent == NULL);
- PrepareTransaction();
+ PrepareTransaction(true);
s->blockState = TBLOCK_DEFAULT;
}
else
@@ -2785,6 +2820,9 @@ AbortCurrentTransaction(void)
* the transaction).
*/
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
AbortTransaction();
CleanupTransaction();
s->blockState = TBLOCK_DEFAULT;
@@ -3136,6 +3174,9 @@ BeginTransactionBlock(void)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "BeginTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3154,8 +3195,13 @@ BeginTransactionBlock(void)
* We do it this way because it's not convenient to change memory context,
* resource owner, etc while executing inside a Portal.
*/
+#ifdef PGXC
+bool
+PrepareTransactionBlock(char *gid, bool write_2pc_file)
+#else
bool
PrepareTransactionBlock(char *gid)
+#endif
{
TransactionState s;
bool result;
@@ -3176,6 +3222,16 @@ PrepareTransactionBlock(char *gid)
/* Save GID where PrepareTransaction can find it again */
prepareGID = MemoryContextStrdup(TopTransactionContext, gid);
+#ifdef PGXC
+ /*
+ * For a Postgres-XC Coordinator, prepare is done for a transaction
+ * if and only if a DDL was involved in the transaction.
+ * If not, it is enough to prepare it on Datanodes involved only.
+ */
+ if (!write_2pc_file)
+ s->blockState = TBLOCK_PREPARE_NO_2PC_FILE;
+ else
+#endif
s->blockState = TBLOCK_PREPARE;
}
else
@@ -3304,6 +3360,9 @@ EndTransactionBlock(void)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "EndTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3396,6 +3455,9 @@ UserAbortTransactionBlock(void)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3443,6 +3505,9 @@ DefineSavepoint(char *name)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "DefineSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3499,6 +3564,9 @@ ReleaseSavepoint(List *options)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "ReleaseSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3597,6 +3665,9 @@ RollbackToSavepoint(List *options)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "RollbackToSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3680,6 +3751,9 @@ BeginInternalSubTransaction(char *name)
case TBLOCK_INPROGRESS:
case TBLOCK_END:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
case TBLOCK_SUBINPROGRESS:
/* Normal subtransaction start */
PushTransaction();
@@ -3772,6 +3846,9 @@ RollbackAndReleaseCurrentSubTransaction(void)
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3820,6 +3897,9 @@ AbortOutOfAnyTransaction(void)
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
/* In a transaction, so clean up */
AbortTransaction();
CleanupTransaction();
@@ -3911,6 +3991,9 @@ TransactionBlockStatusCode(void)
case TBLOCK_END:
case TBLOCK_SUBEND:
case TBLOCK_PREPARE:
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
return 'T'; /* in transaction */
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
@@ -4269,7 +4352,7 @@ PushTransaction(void)
* failure.
*/
#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
s->globalTransactionId = InvalidGlobalTransactionId;
#endif
s->transactionId = InvalidTransactionId; /* until assigned */
@@ -4406,6 +4489,9 @@ BlockStateAsString(TBlockState blockState)
return "ABORT END";
case TBLOCK_ABORT_PENDING:
return "ABORT PEND";
+#ifdef PGXC
+ case TBLOCK_PREPARE_NO_2PC_FILE:
+#endif
case TBLOCK_PREPARE:
return "PREPARE";
case TBLOCK_SUBBEGIN:
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 7fdef1c6ac..5a71c9166a 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -356,8 +356,11 @@ doRename(const ObjectAddress *object, const char *oldname, const char *newname)
* If we are here, a schema is being renamed, a sequence depends on it.
* as sequences' global name use the schema name, this sequence
* has also to be renamed on GTM.
+ * An operation with GTM can just be done from a remote Coordinator.
*/
- if (relKind == RELKIND_SEQUENCE && IS_PGXC_COORDINATOR)
+ if (relKind == RELKIND_SEQUENCE
+ && IS_PGXC_COORDINATOR
+ && !IsConnFromCoord())
{
Relation relseq = relation_open(object->objectId, AccessShareLock);
char *seqname = GetGlobalSeqName(relseq, NULL, oldname);
@@ -1132,8 +1135,11 @@ doDeletion(const ObjectAddress *object)
}
#ifdef PGXC
- /* Drop the sequence on GTM */
- if (relKind == RELKIND_SEQUENCE && IS_PGXC_COORDINATOR)
+ /*
+ * Drop the sequence on GTM.
+ * Sequence is dropped on GTM by a remote Coordinator only.
+ */
+ if (relKind == RELKIND_SEQUENCE && IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
/*
* The sequence has already been removed from coordinator,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 39b1e6d7ed..b9df2a9cde 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -180,7 +180,7 @@ typedef struct CopyStateData
RelationLocInfo *rel_loc; /* the locator key */
int hash_idx; /* index of the hash column */
- DataNodeHandle **connections; /* Involved data node connections */
+ PGXCNodeHandle **connections; /* Involved data node connections */
#endif
} CopyStateData;
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index cd2fd97078..d67cd4e408 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -350,7 +350,8 @@ DefineSequence(CreateSeqStmt *seq)
heap_close(rel, NoLock);
#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
+ /* Remote Coordinator is in charge of creating sequence in GTM */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
char *seqname = GetGlobalSeqName(rel, NULL, NULL);
@@ -492,7 +493,8 @@ AlterSequenceInternal(Oid relid, List *options)
relation_close(seqrel, NoLock);
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ /* Remote Coordinator is in charge of create sequence in GTM */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
char *seqname = GetGlobalSeqName(seqrel, NULL, NULL);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b7df834f28..7970c80730 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2084,8 +2084,10 @@ RenameRelation(Oid myrelid, const char *newrelname, ObjectType reltype)
/* Do the work */
RenameRelationInternal(myrelid, newrelname, namespaceId);
#ifdef PGXC
- if (IS_PGXC_COORDINATOR &&
- (reltype == OBJECT_SEQUENCE || relkind == RELKIND_SEQUENCE)) /* It is possible to rename a sequence with ALTER TABLE */
+ /* Operation with GTM can only be done with a Remote Coordinator */
+ if (IS_PGXC_COORDINATOR
+ && !IsConnFromCoord()
+ && (reltype == OBJECT_SEQUENCE || relkind == RELKIND_SEQUENCE)) /* It is possible to rename a sequence with ALTER TABLE */
{
char *seqname = GetGlobalSeqName(targetrelation, NULL, NULL);
char *newseqname = GetGlobalSeqName(targetrelation, newrelname, NULL);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 482ca6f772..e07760356c 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -59,7 +59,9 @@
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
-
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
/* Hooks for plugins to get control in ExecutorStart/Run/End() */
ExecutorStart_hook_type ExecutorStart_hook = NULL;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 7d41461096..62893e0eef 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -124,7 +124,11 @@ planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
result = (*planner_hook) (parse, cursorOptions, boundParams);
else
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ /*
+ * A coordinator receiving a query from another Coordinator
+ * is not allowed to go into PGXC planner.
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
result = pgxc_planner(parse, cursorOptions, boundParams);
else
#endif
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index d7a932e781..0e0f186d7e 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -277,6 +277,8 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
RemoteQuery *step = makeNode(RemoteQuery);
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = queryString;
+ /* This query is a DDL, Launch it on both Datanodes and Coordinators. */
+ step->exec_type = EXEC_ON_ALL_NODES;
result = lappend(result, step);
}
#endif
@@ -1970,6 +1972,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
RemoteQuery *step = makeNode(RemoteQuery);
step->combine_type = COMBINE_TYPE_SAME;
step->sql_statement = queryString;
+ /* This query is a DDl, it is launched on both Coordinators and Datanodes. */
+ step->exec_type = EXEC_ON_ALL_NODES;
result = lappend(result, step);
}
#endif
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index debbc77188..098e254531 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -24,6 +24,7 @@
#include "postgres.h"
#include "access/skey.h"
+#include "access/gtm.h"
#include "access/relscan.h"
#include "catalog/indexing.h"
#include "catalog/pg_type.h"
@@ -440,12 +441,12 @@ GetLocatorType(Oid relid)
/*
- * Return a list of all nodes.
+ * Return a list of all Datanodes.
* We assume all tables use all nodes in the prototype, so just return a list
* from first one.
*/
List *
-GetAllNodes(void)
+GetAllDataNodes(void)
{
int i;
@@ -463,10 +464,38 @@ GetAllNodes(void)
return nodeList;
}
+/*
+ * Return a list of all Coordinators
+ * This is used to send DDL to all nodes
+ * Do not put in the list the local Coordinator where this function is launched
+ */
+List *
+GetAllCoordNodes(void)
+{
+ int i;
+
+ /*
+ * PGXCTODO - add support for having nodes on a subset of nodes
+ * For now, assume on all nodes
+ */
+ List *nodeList = NIL;
+
+ for (i = 1; i < NumCoords + 1; i++)
+ {
+ /*
+ * Do not put in list the Coordinator we are on,
+ * it doesn't make sense to connect to the local coordinator.
+ */
+ if (i != PGXCNodeId)
+ nodeList = lappend_int(nodeList, i);
+ }
+
+ return nodeList;
+}
+
/*
* Build locator information associated with the specified relation.
- *
*/
void
RelationBuildLocator(Relation rel)
@@ -528,7 +557,7 @@ RelationBuildLocator(Relation rel)
/** PGXCTODO - add support for having nodes on a subset of nodes
* For now, assume on all nodes
*/
- relationLocInfo->nodeList = GetAllNodes();
+ relationLocInfo->nodeList = GetAllDataNodes();
relationLocInfo->nodeCount = relationLocInfo->nodeList->length;
/*
diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile
index c7e950aaa4..f8679eb10c 100644
--- a/src/backend/pgxc/pool/Makefile
+++ b/src/backend/pgxc/pool/Makefile
@@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o
+OBJS = pgxcnode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 16d2f6b815..14dce33cee 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -30,6 +30,8 @@
#include "utils/memutils.h"
#include "utils/tuplesort.h"
#include "utils/snapmgr.h"
+#include "pgxc/locator.h"
+#include "pgxc/pgxc.h"
#define END_QUERY_TIMEOUT 20
#define CLEAR_TIMEOUT 5
@@ -45,26 +47,30 @@ extern char *deparseSql(RemoteQueryState *scanstate);
#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
static bool autocommit = true;
-static DataNodeHandle **write_node_list = NULL;
+static PGXCNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
-static int data_node_begin(int conn_count, DataNodeHandle ** connections,
+static int pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
GlobalTransactionId gxid);
-static int data_node_commit(int conn_count, DataNodeHandle ** connections);
-static int data_node_rollback(int conn_count, DataNodeHandle ** connections);
-static int data_node_prepare(int conn_count, DataNodeHandle ** connections,
- char *gid);
-static int data_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
- int conn_count, DataNodeHandle ** connections,
- char *gid);
-static int data_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
- int conn_count, DataNodeHandle ** connections,
- char *gid);
-
-static void clear_write_node_list();
-
-static int handle_response_clear(DataNodeHandle * conn);
-
+static int pgxc_node_commit(PGXCNodeAllHandles * pgxc_handles);
+static int pgxc_node_rollback(PGXCNodeAllHandles * pgxc_handles);
+static int pgxc_node_prepare(PGXCNodeAllHandles * pgxc_handles, char *gid);
+static int pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
+ PGXCNodeAllHandles * pgxc_handles, char *gid);
+static int pgxc_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
+ PGXCNodeAllHandles * pgxc_handles, char *gid);
+static PGXCNodeAllHandles * get_exec_connections(ExecNodes *exec_nodes,
+ RemoteQueryExecType exec_type);
+static int pgxc_node_receive_and_validate(const int conn_count,
+ PGXCNodeHandle ** connections,
+ bool reset_combiner);
+static void clear_write_node_list(void);
+
+static void pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles);
+
+static int handle_response_clear(PGXCNodeHandle * conn);
+
+static PGXCNodeAllHandles *pgxc_get_all_transaction_nodes(void);
#define MAX_STATEMENTS_PER_TRAN 10
@@ -922,14 +928,14 @@ FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
* Handle responses from the Data node connections
*/
static int
-data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
+pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections,
struct timeval * timeout, RemoteQueryState *combiner)
{
int count = conn_count;
- DataNodeHandle *to_receive[conn_count];
+ PGXCNodeHandle *to_receive[conn_count];
/* make a copy of the pointers to the connections */
- memcpy(to_receive, connections, conn_count * sizeof(DataNodeHandle *));
+ memcpy(to_receive, connections, conn_count * sizeof(PGXCNodeHandle *));
/*
* Read results.
@@ -941,7 +947,7 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
{
int i = 0;
- if (data_node_receive(count, to_receive, timeout))
+ if (pgxc_node_receive(count, to_receive, timeout))
return EOF;
while (i < count)
{
@@ -986,7 +992,7 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
* 2 - got copy response
*/
int
-handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
+handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
{
char *msg;
int msg_len;
@@ -1094,7 +1100,7 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
* RESPONSE_COMPLETE - done with the connection, or done trying (error)
*/
static int
-handle_response_clear(DataNodeHandle * conn)
+handle_response_clear(PGXCNodeHandle * conn)
{
char *msg;
int msg_len;
@@ -1156,10 +1162,10 @@ handle_response_clear(DataNodeHandle * conn)
/*
- * Send BEGIN command to the Data nodes and receive responses
+ * Send BEGIN command to the Datanodes or Coordinators and receive responses
*/
static int
-data_node_begin(int conn_count, DataNodeHandle ** connections,
+pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
GlobalTransactionId gxid)
{
int i;
@@ -1170,20 +1176,20 @@ data_node_begin(int conn_count, DataNodeHandle ** connections,
/* Send BEGIN */
for (i = 0; i < conn_count; i++)
{
- if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid))
+ if (GlobalTransactionIdIsValid(gxid) && pgxc_node_send_gxid(connections[i], gxid))
return EOF;
- if (GlobalTimestampIsValid(timestamp) && data_node_send_timestamp(connections[i], timestamp))
+ if (GlobalTimestampIsValid(timestamp) && pgxc_node_send_timestamp(connections[i], timestamp))
return EOF;
- if (data_node_send_query(connections[i], "BEGIN"))
+ if (pgxc_node_send_query(connections[i], "BEGIN"))
return EOF;
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
/* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ if (pgxc_node_receive_responses(conn_count, connections, timeout, combiner))
return EOF;
/* Verify status */
@@ -1197,17 +1203,17 @@ clear_write_node_list()
/* we just malloc once and use counter */
if (write_node_list == NULL)
{
- write_node_list = (DataNodeHandle **) malloc(NumDataNodes * sizeof(DataNodeHandle *));
+ write_node_list = (PGXCNodeHandle **) malloc(NumDataNodes * sizeof(PGXCNodeHandle *));
}
write_node_count = 0;
}
/*
- * Switch autocommmit mode off, so all subsequent statements will be in the same transaction
+ * Switch autocommit mode off, so all subsequent statements will be in the same transaction
*/
void
-DataNodeBegin(void)
+PGXCNodeBegin(void)
{
autocommit = false;
clear_write_node_list();
@@ -1215,18 +1221,30 @@ DataNodeBegin(void)
/*
- * Prepare transaction on Datanodes involved in current transaction.
+ * Prepare transaction on Datanodes and Coordinators involved in current transaction.
* GXID associated to current transaction has to be committed on GTM.
*/
-int
-DataNodePrepare(char *gid)
+bool
+PGXCNodePrepare(char *gid)
{
int res = 0;
int tran_count;
- DataNodeHandle *connections[NumDataNodes];
+ PGXCNodeAllHandles *pgxc_connections;
+ bool local_operation = false;
+
+ pgxc_connections = pgxc_get_all_transaction_nodes();
- /* gather connections to prepare */
- tran_count = get_transaction_nodes(connections);
+ /* DDL involved in transaction, so make a local prepare too */
+ if (pgxc_connections->co_conn_count != 0)
+ local_operation = true;
+
+ /*
+ * If no connections have been gathered for Coordinators,
+ * it means that no DDL has been involved in this transaction.
+ * And so this transaction is not prepared on Coordinators.
+ * It is only on Datanodes that data is involved.
+ */
+ tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
/*
* If we do not have open transactions we have nothing to prepare just
@@ -1234,12 +1252,11 @@ DataNodePrepare(char *gid)
*/
if (tran_count == 0)
{
- elog(WARNING, "Nothing to PREPARE on Datanodes, gid is not used");
+ elog(WARNING, "Nothing to PREPARE on Datanodes and Coordinators, gid is not used");
goto finish;
}
- /* TODO: data_node_prepare */
- res = data_node_prepare(tran_count, connections, gid);
+ res = pgxc_node_prepare(pgxc_connections, gid);
finish:
/*
@@ -1249,12 +1266,16 @@ finish:
* Release the connections for the moment.
*/
if (!autocommit)
- stat_transaction(tran_count);
+ stat_transaction(pgxc_connections->dn_conn_count);
if (!PersistentConnections)
release_handles(false);
autocommit = true;
clear_write_node_list();
- return res;
+
+ /* Clean up connections */
+ pfree_pgxc_all_handles(pgxc_connections);
+
+ return local_operation;
}
@@ -1262,47 +1283,64 @@ finish:
* Prepare transaction on dedicated nodes with gid received from application
*/
static int
-data_node_prepare(int conn_count, DataNodeHandle ** connections, char *gid)
+pgxc_node_prepare(PGXCNodeAllHandles *pgxc_handles, char *gid)
{
- int i;
+ int real_co_conn_count;
int result = 0;
- struct timeval *timeout = NULL;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
char *buffer = (char *) palloc0(22 + strlen(gid) + 1);
- RemoteQueryState *combiner = NULL;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
PGXC_NodeId *datanodes = NULL;
+ PGXC_NodeId *coordinators = NULL;
gxid = GetCurrentGlobalTransactionId();
/*
* Now that the transaction has been prepared on the nodes,
- * Initialize to make the business on GTM
+ * Initialize to make the business on GTM.
+ * We also had the Coordinator we are on in the prepared state.
+ */
+ if (dn_conn_count != 0)
+ datanodes = collect_pgxcnode_numbers(dn_conn_count,
+ pgxc_handles->datanode_handles, REMOTE_CONN_DATANODE);
+
+ /*
+ * Local Coordinator is saved in the list sent to GTM
+ * only when a DDL is involved in the transaction.
+ * So we don't need to complete the list of Coordinators sent to GTM
+ * when number of connections to Coordinator is zero (no DDL).
*/
- datanodes = collect_datanode_numbers(conn_count, connections);
+ if (co_conn_count != 0)
+ coordinators = collect_pgxcnode_numbers(co_conn_count,
+ pgxc_handles->coord_handles, REMOTE_CONN_COORD);
/*
- * Send a Prepare in Progress message to GTM.
- * At the same time node list is saved on GTM.
+ * Tell to GTM that the transaction is being prepared first.
+ * Don't forget to add in the list of Coordinators the coordinator we are on
+ * if a DDL is involved in the transaction.
+ * This one also is being prepared !
*/
- result = BeingPreparedTranGTM(gxid, gid, conn_count, datanodes, 0, NULL);
+ if (co_conn_count == 0)
+ real_co_conn_count = co_conn_count;
+ else
+ real_co_conn_count = co_conn_count + 1;
+
+ result = StartPreparedTranGTM(gxid, gid, dn_conn_count,
+ datanodes, real_co_conn_count, coordinators);
if (result < 0)
return EOF;
sprintf(buffer, "PREPARE TRANSACTION '%s'", gid);
- /* Send PREPARE */
- for (i = 0; i < conn_count; i++)
- if (data_node_send_query(connections[i], buffer))
- return EOF;
+ /* Continue even after an error here, to consume the messages */
+ result = pgxc_all_handles_send_query(pgxc_handles, buffer, true);
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- return EOF;
-
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
if (result)
goto finish;
@@ -1324,31 +1362,27 @@ finish:
if (result)
{
GlobalTransactionId rollback_xid = InvalidGlobalTransactionId;
- buffer = (char *) repalloc(buffer, 20 + strlen(gid) + 1);
+ result = 0;
+ buffer = (char *) repalloc(buffer, 20 + strlen(gid) + 1);
sprintf(buffer, "ROLLBACK PREPARED '%s'", gid);
- rollback_xid = BeginTranGTM(NULL);
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_gxid(connections[i], rollback_xid))
- {
- add_error_message(connections[i], "Can not send request");
- return EOF;
- }
- if (data_node_send_query(connections[i], buffer))
- {
- add_error_message(connections[i], "Can not send request");
- return EOF;
- }
- }
+ /* Consume any messages on the Datanodes and Coordinators first if necessary */
+ PGXCNodeConsumeMessages();
- if (!combiner)
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ rollback_xid = BeginTranGTM(NULL);
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ /*
+ * Send xid and rollback prepared down to Datanodes and Coordinators
+ * Even if we get an error on one, we try and send to the others
+ */
+ if (pgxc_all_handles_send_gxid(pgxc_handles, rollback_xid, false))
result = EOF;
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
+ result = EOF;
+
+ result = pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
/*
* Don't forget to rollback also on GTM
@@ -1364,26 +1398,30 @@ finish:
/*
- * Commit prepared transaction on Datanodes where it has been prepared.
+ * Commit prepared transaction on Datanodes and Coordinators (as necessary)
+ * where it has been prepared.
* Connection to backends has been cut when transaction has been prepared,
* So it is necessary to send the COMMIT PREPARE message to all the nodes.
* We are not sure if the transaction prepared has involved all the datanodes
* or not but send the message to all of them.
* This avoid to have any additional interaction with GTM when making a 2PC transaction.
*/
-void
-DataNodeCommitPrepared(char *gid)
+bool
+PGXCNodeCommitPrepared(char *gid)
{
int res = 0;
int res_gtm = 0;
- DataNodeHandle **connections;
- List *nodelist = NIL;
+ PGXCNodeAllHandles *pgxc_handles;
+ List *datanodelist = NIL;
+ List *coordlist = NIL;
int i, tran_count;
PGXC_NodeId *datanodes = NULL;
PGXC_NodeId *coordinators = NULL;
int coordcnt = 0;
int datanodecnt = 0;
GlobalTransactionId gxid, prepared_gxid;
+ /* This flag tracks if the transaction has to be committed locally */
+ bool operation_local = false;
res_gtm = GetGIDDataGTM(gid, &gxid, &prepared_gxid,
&datanodecnt, &datanodes, &coordcnt, &coordinators);
@@ -1394,17 +1432,33 @@ DataNodeCommitPrepared(char *gid)
autocommit = false;
- /* Build the list of nodes based on data received from GTM */
+ /*
+ * Build the list of nodes based on data received from GTM.
+ * For Sequence DDL this list is NULL.
+ */
for (i = 0; i < datanodecnt; i++)
+ datanodelist = lappend_int(datanodelist,datanodes[i]);
+
+ for (i = 0; i < coordcnt; i++)
{
- nodelist = lappend_int(nodelist,datanodes[i]);
+ /* Local Coordinator number found, has to commit locally also */
+ if (coordinators[i] == PGXCNodeId)
+ operation_local = true;
+ else
+ coordlist = lappend_int(coordlist,coordinators[i]);
}
/* Get connections */
- connections = get_handles(nodelist);
+ if (coordcnt > 0 && datanodecnt == 0)
+ pgxc_handles = get_handles(datanodelist, coordlist, true);
+ else
+ pgxc_handles = get_handles(datanodelist, coordlist, false);
- /* Commit here the prepared transaction to all Datanodes */
- res = data_node_commit_prepared(gxid, prepared_gxid, datanodecnt, connections, gid);
+ /*
+ * Commit here the prepared transaction to all Datanodes and Coordinators
+ * If necessary, local Coordinator Commit is performed after this DataNodeCommitPrepared.
+ */
+ res = pgxc_node_commit_prepared(gxid, prepared_gxid, pgxc_handles, gid);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1416,11 +1470,13 @@ finish:
clear_write_node_list();
/* Free node list taken from GTM */
- if (datanodes)
+ if (datanodes && datanodecnt != 0)
free(datanodes);
- if (coordinators)
+
+ if (coordinators && coordcnt != 0)
free(coordinators);
+ pfree_pgxc_all_handles(pgxc_handles);
if (res_gtm < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -1429,6 +1485,8 @@ finish:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not commit prepared transaction on data nodes")));
+
+ return operation_local;
}
/*
@@ -1440,42 +1498,29 @@ finish:
* This permits to avoid interactions with GTM.
*/
static int
-data_node_commit_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, int conn_count, DataNodeHandle ** connections, char *gid)
+pgxc_node_commit_prepared(GlobalTransactionId gxid,
+ GlobalTransactionId prepared_gxid,
+ PGXCNodeAllHandles *pgxc_handles,
+ char *gid)
{
int result = 0;
- int i;
- RemoteQueryState *combiner = NULL;
- struct timeval *timeout = NULL;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
char *buffer = (char *) palloc0(18 + strlen(gid) + 1);
/* GXID has been piggybacked when gid data has been received from GTM */
sprintf(buffer, "COMMIT PREPARED '%s'", gid);
/* Send gxid and COMMIT PREPARED message to all the Datanodes */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_gxid(connections[i], gxid))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
- if (data_node_send_query(connections[i], buffer))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
- }
-
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+ if (pgxc_all_handles_send_gxid(pgxc_handles, gxid, true))
+ goto finish;
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ /* Continue and receive responses even if there is an error */
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
result = EOF;
- /* Validate and close combiner */
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+ result = pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
finish:
/* Both GXIDs used for PREPARE and COMMIT PREPARED are discarded from GTM snapshot here */
@@ -1486,21 +1531,25 @@ finish:
/*
* Rollback prepared transaction on Datanodes involved in the current transaction
+ *
+ * Return whether or not a local operation required.
*/
-void
-DataNodeRollbackPrepared(char *gid)
+bool
+PGXCNodeRollbackPrepared(char *gid)
{
int res = 0;
int res_gtm = 0;
- DataNodeHandle **connections;
- List *nodelist = NIL;
+ PGXCNodeAllHandles *pgxc_handles;
+ List *datanodelist = NIL;
+ List *coordlist = NIL;
int i, tran_count;
-
PGXC_NodeId *datanodes = NULL;
PGXC_NodeId *coordinators = NULL;
int coordcnt = 0;
int datanodecnt = 0;
GlobalTransactionId gxid, prepared_gxid;
+ /* This flag tracks if the transaction has to be rolled back locally */
+ bool operation_local = false;
res_gtm = GetGIDDataGTM(gid, &gxid, &prepared_gxid,
&datanodecnt, &datanodes, &coordcnt, &coordinators);
@@ -1513,15 +1562,25 @@ DataNodeRollbackPrepared(char *gid)
/* Build the node list based on the result got from GTM */
for (i = 0; i < datanodecnt; i++)
+ datanodelist = lappend_int(datanodelist,datanodes[i]);
+
+ for (i = 0; i < coordcnt; i++)
{
- nodelist = lappend_int(nodelist,datanodes[i]);
+ /* Local Coordinator number found, has to rollback locally also */
+ if (coordinators[i] == PGXCNodeId)
+ operation_local = true;
+ else
+ coordlist = lappend_int(coordlist,coordinators[i]);
}
/* Get connections */
- connections = get_handles(nodelist);
+ if (coordcnt > 0 && datanodecnt == 0)
+ pgxc_handles = get_handles(datanodelist, coordlist, true);
+ else
+ pgxc_handles = get_handles(datanodelist, coordlist, false);
- /* Here do the real rollback to Datanodes */
- res = data_node_rollback_prepared(gxid, prepared_gxid, datanodecnt, connections, gid);
+ /* Here do the real rollback to Datanodes and Coordinators */
+ res = pgxc_node_rollback_prepared(gxid, prepared_gxid, pgxc_handles, gid);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1530,7 +1589,16 @@ finish:
if (!PersistentConnections)
release_handles(true);
autocommit = true;
- clear_write_node_list(true);
+ clear_write_node_list();
+
+ /* Free node list taken from GTM */
+ if (datanodes)
+ free(datanodes);
+
+ if (coordinators)
+ free(coordinators);
+
+ pfree_pgxc_all_handles(pgxc_handles);
if (res_gtm < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -1539,6 +1607,8 @@ finish:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not rollback prepared transaction on Datanodes")));
+
+ return operation_local;
}
@@ -1548,13 +1618,12 @@ finish:
* At the end both prepared GXID and GXID are committed.
*/
static int
-data_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
- int conn_count, DataNodeHandle ** connections, char *gid)
+pgxc_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
+ PGXCNodeAllHandles *pgxc_handles, char *gid)
{
int result = 0;
- int i;
- RemoteQueryState *combiner = NULL;
- struct timeval *timeout = NULL;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
+ int co_conn_count = pgxc_handles->co_conn_count;
char *buffer = (char *) palloc0(20 + strlen(gid) + 1);
/* Datanodes have reset after prepared state, so get a new gxid */
@@ -1562,34 +1631,15 @@ data_node_rollback_prepared(GlobalTransactionId gxid, GlobalTransactionId prepar
sprintf(buffer, "ROLLBACK PREPARED '%s'", gid);
- /* Send gxid and COMMIT PREPARED message to all the Datanodes */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_gxid(connections[i], gxid))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
-
- if (data_node_send_query(connections[i], buffer))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
- }
-
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
-
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ /* Send gxid and ROLLBACK PREPARED message to all the Datanodes */
+ if (pgxc_all_handles_send_gxid(pgxc_handles, gxid, false))
+ result = EOF;
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
result = EOF;
- /* Validate and close combiner */
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+ result = pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
-finish:
/* Both GXIDs used for PREPARE and COMMIT PREPARED are discarded from GTM snapshot here */
CommitPreparedTranGTM(gxid, prepared_gxid);
@@ -1601,14 +1651,15 @@ finish:
* Commit current transaction on data nodes where it has been started
*/
void
-DataNodeCommit(void)
+PGXCNodeCommit(void)
{
int res = 0;
int tran_count;
- DataNodeHandle *connections[NumDataNodes];
+ PGXCNodeAllHandles *pgxc_connections;
- /* gather connections to commit */
- tran_count = get_transaction_nodes(connections);
+ pgxc_connections = pgxc_get_all_transaction_nodes();
+
+ tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
/*
* If we do not have open transactions we have nothing to commit, just
@@ -1617,7 +1668,7 @@ DataNodeCommit(void)
if (tran_count == 0)
goto finish;
- res = data_node_commit(tran_count, connections);
+ res = pgxc_node_commit(pgxc_connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1627,6 +1678,9 @@ finish:
release_handles(false);
autocommit = true;
clear_write_node_list();
+
+ /* Clear up connection */
+ pfree_pgxc_all_handles(pgxc_connections);
if (res != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
@@ -1639,15 +1693,13 @@ finish:
* if more then on one node data have been modified during the transactioon.
*/
static int
-data_node_commit(int conn_count, DataNodeHandle ** connections)
+pgxc_node_commit(PGXCNodeAllHandles *pgxc_handles)
{
- int i;
- struct timeval *timeout = NULL;
char buffer[256];
GlobalTransactionId gxid = InvalidGlobalTransactionId;
int result = 0;
- RemoteQueryState *combiner = NULL;
-
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
/* can set this to false to disable temporarily */
/* bool do2PC = conn_count > 1; */
@@ -1674,21 +1726,13 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
gxid = GetCurrentGlobalTransactionId();
sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid);
- /* Send PREPARE */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_query(connections[i], buffer))
- return EOF;
- }
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
result = EOF;
- /* Reset combiner */
- if (!ValidateAndResetCombiner(combiner))
- result = EOF;
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, true);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, true);
}
if (!do2PC)
@@ -1696,7 +1740,11 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
else
{
if (result)
+ {
sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid);
+ /* Consume any messages on the Datanodes and Coordinators first if necessary */
+ PGXCNodeConsumeMessages();
+ }
else
sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
@@ -1707,33 +1755,20 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
*/
two_phase_xid = BeginTranGTM(NULL);
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_gxid(connections[i], two_phase_xid))
- {
- add_error_message(connections[i], "Can not send request");
- result = EOF;
- goto finish;
- }
- }
- }
-
- /* Send COMMIT */
- for (i = 0; i < conn_count; i++)
- {
- if (data_node_send_query(connections[i], buffer))
+ if (pgxc_all_handles_send_gxid(pgxc_handles, two_phase_xid, true))
{
result = EOF;
goto finish;
}
}
- if (!combiner)
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ /* Send COMMIT to all handles */
+ if (pgxc_all_handles_send_query(pgxc_handles, buffer, false))
result = EOF;
- result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
finish:
if (do2PC)
@@ -1748,18 +1783,18 @@ finish:
* This will happen
*/
int
-DataNodeRollback(void)
+PGXCNodeRollback(void)
{
int res = 0;
int tran_count;
- DataNodeHandle *connections[NumDataNodes];
+ PGXCNodeAllHandles *pgxc_connections;
+ pgxc_connections = pgxc_get_all_transaction_nodes();
- /* Consume any messages on the data nodes first if necessary */
- DataNodeConsumeMessages();
+ tran_count = pgxc_connections->dn_conn_count + pgxc_connections->co_conn_count;
- /* gather connections to rollback */
- tran_count = get_transaction_nodes(connections);
+ /* Consume any messages on the Datanodes and Coordinators first if necessary */
+ PGXCNodeConsumeMessages();
/*
* If we do not have open transactions we have nothing to rollback just
@@ -1768,7 +1803,7 @@ DataNodeRollback(void)
if (tran_count == 0)
goto finish;
- res = data_node_rollback(tran_count, connections);
+ res = pgxc_node_rollback(pgxc_connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1778,20 +1813,23 @@ finish:
release_handles(true);
autocommit = true;
clear_write_node_list();
+
+ /* Clean up connections */
+ pfree_pgxc_all_handles(pgxc_connections);
return res;
}
/*
- * Send ROLLBACK command down to the Data nodes and handle responses
+ * Send ROLLBACK command down to Datanodes and Coordinators and handle responses
*/
static int
-data_node_rollback(int conn_count, DataNodeHandle ** connections)
+pgxc_node_rollback(PGXCNodeAllHandles *pgxc_handles)
{
int i;
- struct timeval *timeout = NULL;
- RemoteQueryState *combiner;
-
+ int result = 0;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int dn_conn_count = pgxc_handles->dn_conn_count;
/*
* Rollback is a special case, being issued because of an error.
@@ -1799,20 +1837,21 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections)
* issuing our rollbacks so that we did not read the results of the
* previous command.
*/
- for (i = 0; i < conn_count; i++)
- clear_socket_data(connections[i]);
+ for (i = 0; i < dn_conn_count; i++)
+ clear_socket_data(pgxc_handles->datanode_handles[i]);
- /* Send ROLLBACK - */
- for (i = 0; i < conn_count; i++)
- data_node_send_query(connections[i], "ROLLBACK");
+ for (i = 0; i < co_conn_count; i++)
+ clear_socket_data(pgxc_handles->coord_handles[i]);
- combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- return EOF;
+ /* Send ROLLBACK to all handles */
+ if (pgxc_all_handles_send_query(pgxc_handles, "ROLLBACK", false))
+ result = EOF;
- /* Verify status */
- return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
+ /* Receive and Combine results from Datanodes and Coordinators */
+ result |= pgxc_node_receive_and_validate(dn_conn_count, pgxc_handles->datanode_handles, false);
+ result |= pgxc_node_receive_and_validate(co_conn_count, pgxc_handles->coord_handles, false);
+
+ return result;
}
@@ -1820,15 +1859,16 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections)
* Begin COPY command
* The copy_connections array must have room for NumDataNodes items
*/
-DataNodeHandle**
+PGXCNodeHandle**
DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from)
{
int i, j;
int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist);
struct timeval *timeout = NULL;
- DataNodeHandle **connections;
- DataNodeHandle **copy_connections;
- DataNodeHandle *newConnections[conn_count];
+ PGXCNodeAllHandles *pgxc_handles;
+ PGXCNodeHandle **connections;
+ PGXCNodeHandle **copy_connections;
+ PGXCNodeHandle *newConnections[conn_count];
int new_count = 0;
ListCell *nodeitem;
bool need_tran;
@@ -1840,7 +1880,9 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
return NULL;
/* Get needed datanode connections */
- connections = get_handles(nodelist);
+ pgxc_handles = get_handles(nodelist, NULL, false);
+ connections = pgxc_handles->datanode_handles;
+
if (!connections)
return NULL;
@@ -1853,7 +1895,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
* So store connections in an array where index is node-1.
* Unused items in the array should be NULL
*/
- copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *));
+ copy_connections = (PGXCNodeHandle **) palloc0(NumDataNodes * sizeof(PGXCNodeHandle *));
i = 0;
foreach(nodeitem, nodelist)
copy_connections[lfirst_int(nodeitem) - 1] = connections[i++];
@@ -1910,7 +1952,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
if (new_count > 0 && need_tran)
{
/* Start transaction on connections where it is not started */
- if (data_node_begin(new_count, newConnections, gxid))
+ if (pgxc_node_begin(new_count, newConnections, gxid))
{
pfree(connections);
pfree(copy_connections);
@@ -1922,18 +1964,18 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
for (i = 0; i < conn_count; i++)
{
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
add_error_message(connections[i], "Can not send request");
pfree(connections);
pfree(copy_connections);
return NULL;
}
- if (conn_count == 1 && data_node_send_timestamp(connections[i], timestamp))
+ if (conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp))
{
/*
* If a transaction involves multiple connections timestamp, is
- * always sent down to Datanodes with data_node_begin.
+ * always sent down to Datanodes with pgxc_node_begin.
* An autocommit transaction needs the global timestamp also,
* so handle this case here.
*/
@@ -1942,14 +1984,14 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
pfree(copy_connections);
return NULL;
}
- if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot))
{
add_error_message(connections[i], "Can not send request");
pfree(connections);
pfree(copy_connections);
return NULL;
}
- if (data_node_send_query(connections[i], query) != 0)
+ if (pgxc_node_send_query(connections[i], query) != 0)
{
add_error_message(connections[i], "Can not send request");
pfree(connections);
@@ -1966,7 +2008,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
/* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner)
+ if (pgxc_node_receive_responses(conn_count, connections, timeout, combiner)
|| !ValidateAndCloseCombiner(combiner))
{
if (autocommit)
@@ -1990,9 +2032,9 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
* Send a data row to the specified nodes
*/
int
-DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle** copy_connections)
+DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections)
{
- DataNodeHandle *primary_handle = NULL;
+ PGXCNodeHandle *primary_handle = NULL;
ListCell *nodeitem;
/* size + data row + \n */
int msgLen = 4 + len + 1;
@@ -2014,7 +2056,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle**
if (bytes_needed > COPY_BUFFER_SIZE)
{
/* First look if data node has sent a error message */
- int read_status = data_node_read_data(primary_handle);
+ int read_status = pgxc_node_read_data(primary_handle);
if (read_status == EOF || read_status < 0)
{
add_error_message(primary_handle, "failed to read data from data node");
@@ -2062,7 +2104,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle**
foreach(nodeitem, exec_nodes->nodelist)
{
- DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1];
+ PGXCNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1];
if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN)
{
/* precalculate to speed up access */
@@ -2075,7 +2117,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle**
int to_send = handle->outEnd;
/* First look if data node has sent a error message */
- int read_status = data_node_read_data(handle);
+ int read_status = pgxc_node_read_data(handle);
if (read_status == EOF || read_status < 0)
{
add_error_message(handle, "failed to read data from data node");
@@ -2143,7 +2185,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle**
}
uint64
-DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file)
+DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file)
{
RemoteQueryState *combiner;
int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist);
@@ -2164,7 +2206,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
foreach(nodeitem, exec_nodes->nodelist)
{
- DataNodeHandle *handle = copy_connections[count];
+ PGXCNodeHandle *handle = copy_connections[count];
count++;
if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT)
@@ -2176,7 +2218,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
if (handle_response(handle,combiner) == RESPONSE_EOF)
{
/* read some extra-data */
- read_status = data_node_read_data(handle);
+ read_status = pgxc_node_read_data(handle);
if (read_status < 0)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -2212,7 +2254,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
* Finish copy process on all connections
*/
void
-DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
+DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node,
CombineType combine_type)
{
int i;
@@ -2221,13 +2263,13 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
bool need_tran;
bool error = false;
struct timeval *timeout = NULL; /* wait forever */
- DataNodeHandle *connections[NumDataNodes];
- DataNodeHandle *primary_handle = NULL;
+ PGXCNodeHandle *connections[NumDataNodes];
+ PGXCNodeHandle *primary_handle = NULL;
int conn_count = 0;
for (i = 0; i < NumDataNodes; i++)
{
- DataNodeHandle *handle = copy_connections[i];
+ PGXCNodeHandle *handle = copy_connections[i];
if (!handle)
continue;
@@ -2255,7 +2297,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
primary_handle->outEnd += 4;
/* We need response right away, so send immediately */
- if (data_node_flush(primary_handle) < 0)
+ if (pgxc_node_flush(primary_handle) < 0)
{
error = true;
}
@@ -2266,12 +2308,12 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
}
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
- error = (data_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error;
+ error = (pgxc_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error;
}
for (i = 0; i < conn_count; i++)
{
- DataNodeHandle *handle = connections[i];
+ PGXCNodeHandle *handle = connections[i];
if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
{
@@ -2288,7 +2330,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
handle->outEnd += 4;
/* We need response right away, so send immediately */
- if (data_node_flush(handle) < 0)
+ if (pgxc_node_flush(handle) < 0)
{
error = true;
}
@@ -2303,7 +2345,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
if (!combiner)
combiner = CreateResponseCombiner(conn_count, combine_type);
- error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
+ error = (pgxc_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
if (!ValidateAndCloseCombiner(combiner) || error)
ereport(ERROR,
@@ -2435,15 +2477,28 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
}
}
-static void
+/*
+ * Get Node connections depending on the connection type:
+ * Datanodes Only, Coordinators only or both types
+ */
+static PGXCNodeAllHandles *
get_exec_connections(ExecNodes *exec_nodes,
- int *regular_conn_count,
- int *total_conn_count,
- DataNodeHandle ***connections,
- DataNodeHandle ***primaryconnection)
+ RemoteQueryExecType exec_type)
{
List *nodelist = NIL;
List *primarynode = NIL;
+ List *coordlist = NIL;
+ PGXCNodeHandle *primaryconnection;
+ int co_conn_count, dn_conn_count;
+ bool is_query_coord_only = false;
+ PGXCNodeAllHandles *pgxc_handles = NULL;
+
+ /*
+ * If query is launched only on Coordinators, we have to inform get_handles
+ * not to ask for Datanode connections even if list of Datanodes is NIL.
+ */
+ if (exec_type == EXEC_ON_COORDS)
+ is_query_coord_only = true;
if (exec_nodes)
{
@@ -2451,38 +2506,61 @@ get_exec_connections(ExecNodes *exec_nodes,
primarynode = exec_nodes->primarynodelist;
}
- if (list_length(nodelist) == 0)
+ if (list_length(nodelist) == 0 &&
+ (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_DATANODES))
+ {
+ /* Primary connection is included in this number of connections if it exists */
+ dn_conn_count = NumDataNodes;
+ }
+ else
{
if (primarynode)
- *regular_conn_count = NumDataNodes - 1;
+ dn_conn_count = list_length(nodelist) + 1;
else
- *regular_conn_count = NumDataNodes;
+ dn_conn_count = list_length(nodelist);
}
- else
+
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_COORDS)
{
- *regular_conn_count = list_length(nodelist);
+ co_conn_count = NumCoords;
+ coordlist = GetAllCoordNodes();
}
+ else
+ co_conn_count = 0;
- *total_conn_count = *regular_conn_count;
+ /* Get other connections (non-primary) */
+ pgxc_handles = get_handles(nodelist, coordlist, is_query_coord_only);
+ if (!pgxc_handles)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not obtain connection from pool")));
/* Get connection for primary node, if used */
if (primarynode)
{
- *primaryconnection = get_handles(primarynode);
- if (!*primaryconnection)
+ /* Let's assume primary connection is always a datanode connection for the moment */
+ PGXCNodeAllHandles *pgxc_conn_res;
+ pgxc_conn_res = get_handles(primarynode, NULL, false);
+
+ /* primary connection is unique */
+ primaryconnection = pgxc_conn_res->datanode_handles[0];
+
+ pfree(pgxc_conn_res);
+
+ if (!primaryconnection)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not obtain connection from pool")));
- (*total_conn_count)++;
+ pgxc_handles->primary_handle = primaryconnection;
}
- /* Get other connections (non-primary) */
- *connections = get_handles(nodelist);
- if (!*connections)
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not obtain connection from pool")));
+ /* Depending on the execution type, we still need to save the initial node counts */
+ pgxc_handles->dn_conn_count = dn_conn_count;
+ pgxc_handles->co_conn_count = co_conn_count;
+ return pgxc_handles;
}
/*
@@ -2491,7 +2569,7 @@ get_exec_connections(ExecNodes *exec_nodes,
* already have more then one write nodes.
*/
static void
-register_write_nodes(int conn_count, DataNodeHandle **connections)
+register_write_nodes(int conn_count, PGXCNodeHandle **connections)
{
int i, j;
@@ -2539,12 +2617,13 @@ ExecRemoteQuery(RemoteQueryState *node)
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = GetActiveSnapshot();
TimestampTz timestamp = GetCurrentGTMStartTimestamp();
- DataNodeHandle **connections = NULL;
- DataNodeHandle **primaryconnection = NULL;
+ PGXCNodeHandle **connections = NULL;
+ PGXCNodeHandle *primaryconnection = NULL;
int i;
int regular_conn_count;
int total_conn_count;
bool need_tran;
+ PGXCNodeAllHandles *pgxc_connections;
/*
* If coordinator plan is specified execute it first.
@@ -2561,11 +2640,26 @@ ExecRemoteQuery(RemoteQueryState *node)
return slot;
}
- get_exec_connections(step->exec_nodes,
- &regular_conn_count,
- &total_conn_count,
- &connections,
- &primaryconnection);
+ /*
+ * Get connections for Datanodes only, utilities and DDLs
+ * are launched in ExecRemoteUtility
+ */
+ pgxc_connections = get_exec_connections(step->exec_nodes,
+ EXEC_ON_DATANODES);
+
+ connections = pgxc_connections->datanode_handles;
+ primaryconnection = pgxc_connections->primary_handle;
+ total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count;
+
+ /*
+ * Primary connection is counted separately but is included in total_conn_count if used.
+ */
+ if (primaryconnection)
+ {
+ regular_conn_count--;
+ }
+
+ pfree(pgxc_connections);
/*
* We save only regular connections, at the time we exit the function
@@ -2577,7 +2671,7 @@ ExecRemoteQuery(RemoteQueryState *node)
if (force_autocommit)
need_tran = false;
else
- need_tran = !autocommit || !is_read_only && total_conn_count > 1;
+ need_tran = !autocommit || (!is_read_only && total_conn_count > 1);
elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primaryconnection ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
@@ -2592,7 +2686,7 @@ ExecRemoteQuery(RemoteQueryState *node)
if (!is_read_only)
{
if (primaryconnection)
- register_write_nodes(1, primaryconnection);
+ register_write_nodes(1, &primaryconnection);
register_write_nodes(regular_conn_count, connections);
}
@@ -2614,16 +2708,16 @@ ExecRemoteQuery(RemoteQueryState *node)
* Check if data node connections are in transaction and start
* transactions on nodes where it is not started
*/
- DataNodeHandle *new_connections[total_conn_count];
+ PGXCNodeHandle *new_connections[total_conn_count];
int new_count = 0;
- if (primaryconnection && primaryconnection[0]->transaction_status != 'T')
- new_connections[new_count++] = primaryconnection[0];
+ if (primaryconnection && primaryconnection->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection;
for (i = 0; i < regular_conn_count; i++)
if (connections[i]->transaction_status != 'T')
new_connections[new_count++] = connections[i];
- if (new_count && data_node_begin(new_count, new_connections, gxid))
+ if (new_count && pgxc_node_begin(new_count, new_connections, gxid))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not begin transaction on data nodes.")));
@@ -2634,11 +2728,11 @@ ExecRemoteQuery(RemoteQueryState *node)
if (!step->is_single_step)
step->sql_statement = deparseSql(node);
- /* See if we have a primary nodes, execute on it first before the others */
+ /* See if we have a primary node, execute on it first before the others */
if (primaryconnection)
{
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
+ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
pfree(connections);
pfree(primaryconnection);
@@ -2646,11 +2740,11 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (total_conn_count == 1 && data_node_send_timestamp(primaryconnection[0], timestamp))
+ if (total_conn_count == 1 && pgxc_node_send_timestamp(primaryconnection, timestamp))
{
/*
* If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with data_node_begin.
+ * always sent down to Datanodes with pgxc_node_begin.
* An autocommit transaction needs the global timestamp also,
* so handle this case here.
*/
@@ -2660,7 +2754,7 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot))
{
pfree(connections);
pfree(primaryconnection);
@@ -2668,7 +2762,7 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (data_node_send_query(primaryconnection[0], step->sql_statement) != 0)
+ if (pgxc_node_send_query(primaryconnection, step->sql_statement) != 0)
{
pfree(connections);
pfree(primaryconnection);
@@ -2681,12 +2775,12 @@ ExecRemoteQuery(RemoteQueryState *node)
while (node->command_complete_count < 1)
{
- if (data_node_receive(1, primaryconnection, NULL))
+ if (pgxc_node_receive(1, &primaryconnection, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
- while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
- if (data_node_receive(1, primaryconnection, NULL))
+ while (handle_response(primaryconnection, node) == RESPONSE_EOF)
+ if (pgxc_node_receive(1, &primaryconnection, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
@@ -2698,43 +2792,49 @@ ExecRemoteQuery(RemoteQueryState *node)
errmsg("%s", node->errorMessage)));
}
}
- pfree(primaryconnection);
}
for (i = 0; i < regular_conn_count; i++)
{
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ if (!need_tran && pgxc_node_send_gxid(connections[i], gxid))
{
pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (total_conn_count == 1 && data_node_send_timestamp(connections[i], timestamp))
+ if (total_conn_count == 1 && pgxc_node_send_timestamp(connections[i], timestamp))
{
/*
* If a transaction involves multiple connections timestamp is
- * always sent down to Datanodes with data_node_begin.
+ * always sent down to Datanodes with pgxc_node_begin.
* An autocommit transaction needs the global timestamp also,
* so handle this case here.
*/
pfree(connections);
- pfree(primaryconnection);
+ if (primaryconnection)
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(connections[i], snapshot))
{
pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (data_node_send_query(connections[i], step->sql_statement) != 0)
+ if (pgxc_node_send_query(connections[i], step->sql_statement) != 0)
{
pfree(connections);
+ if (primaryconnection)
+ pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
@@ -2749,7 +2849,7 @@ ExecRemoteQuery(RemoteQueryState *node)
{
int i = 0;
- if (data_node_receive(regular_conn_count, connections, NULL))
+ if (pgxc_node_receive(regular_conn_count, connections, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
@@ -2931,7 +3031,7 @@ ExecRemoteQuery(RemoteQueryState *node)
i = 0;
/* if we cycled over all connections we need to receive more */
if (i == node->current_conn)
- if (data_node_receive(node->conn_count, node->connections, NULL))
+ if (pgxc_node_receive(node->conn_count, node->connections, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes")));
@@ -2967,7 +3067,6 @@ ExecRemoteQuery(RemoteQueryState *node)
if (!have_tuple) /* report end of scan */
ExecClearTuple(resultslot);
-
}
if (node->errorMessage)
@@ -2991,7 +3090,7 @@ ExecEndRemoteQuery(RemoteQueryState *node)
/*
* If processing was interrupted, (ex: client did not consume all the data,
* or a subquery with LIMIT) we may still have data on the nodes. Try and consume.
- * We do not simply call DataNodeConsumeMessages, because the same
+ * We do not simply call PGXCNodeConsumeMessages, because the same
* connection could be used for multiple RemoteQuery steps.
*
* It seems most stable checking command_complete_count
@@ -3051,7 +3150,7 @@ ExecEndRemoteQuery(RemoteQueryState *node)
timeout.tv_sec = END_QUERY_TIMEOUT;
timeout.tv_usec = 0;
- if (data_node_receive(node->conn_count, node->connections, &timeout))
+ if (pgxc_node_receive(node->conn_count, node->connections, &timeout))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to read response from data nodes when ending query")));
@@ -3090,17 +3189,17 @@ ExecEndRemoteQuery(RemoteQueryState *node)
* This is useful for calling after ereport()
*/
void
-DataNodeConsumeMessages(void)
+PGXCNodeConsumeMessages(void)
{
int i;
int active_count = 0;
int res;
struct timeval timeout;
- DataNodeHandle *connection = NULL;
- DataNodeHandle **connections = NULL;
- DataNodeHandle *active_connections[NumDataNodes];
-
+ PGXCNodeHandle *connection = NULL;
+ PGXCNodeHandle **connections = NULL;
+ PGXCNodeHandle *active_connections[NumDataNodes+NumCoords];
+ /* Get all active Coordinators and Datanodes */
active_count = get_active_nodes(active_connections);
/* Iterate through handles in use and try and clean */
@@ -3119,14 +3218,14 @@ DataNodeConsumeMessages(void)
if (res == RESPONSE_EOF)
{
if (!connections)
- connections = (DataNodeHandle **) palloc(sizeof(DataNodeHandle*));
+ connections = (PGXCNodeHandle **) palloc(sizeof(PGXCNodeHandle*));
connections[0] = connection;
/* Use a timeout so we do not wait forever */
timeout.tv_sec = CLEAR_TIMEOUT;
timeout.tv_usec = 0;
- if (data_node_receive(1, connections, &timeout))
+ if (pgxc_node_receive(1, connections, &timeout))
{
/* Mark this as bad, move on to next one */
connection->state = DN_CONNECTION_STATE_ERROR_FATAL;
@@ -3173,6 +3272,9 @@ ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt)
*
* But does not need an Estate instance and does not do some unnecessary work,
* like allocating tuple slots.
+ *
+ * Handles are freed when an error occurs during Transaction Abort, it is first necessary
+ * to consume all the messages on the connections.
*/
void
ExecRemoteUtility(RemoteQuery *node)
@@ -3180,41 +3282,59 @@ ExecRemoteUtility(RemoteQuery *node)
RemoteQueryState *remotestate;
bool force_autocommit = node->force_autocommit;
bool is_read_only = node->read_only;
+ RemoteQueryExecType exec_type = node->exec_type;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
Snapshot snapshot = GetActiveSnapshot();
- DataNodeHandle **connections = NULL;
- DataNodeHandle **primaryconnection = NULL;
+ PGXCNodeAllHandles *pgxc_connections;
+ PGXCNodeHandle *primaryconnection = NULL;/* For the moment only Datanode has a primary */
int regular_conn_count;
int total_conn_count;
+ int co_conn_count;
bool need_tran;
int i;
remotestate = CreateResponseCombiner(0, node->combine_type);
- get_exec_connections(node->exec_nodes,
- &regular_conn_count,
- &total_conn_count,
- &connections,
- &primaryconnection);
+ pgxc_connections = get_exec_connections(node->exec_nodes,
+ exec_type);
+
+ primaryconnection = pgxc_connections->primary_handle;
+
+ /* Registering new connections needs the sum of Connections to Datanodes AND to Coordinators */
+ total_conn_count = regular_conn_count = pgxc_connections->dn_conn_count
+ + pgxc_connections->co_conn_count;
+
+ regular_conn_count = pgxc_connections->dn_conn_count;
+ co_conn_count = pgxc_connections->co_conn_count;
+
+ /*
+ * Primary connection is counted separately in regular connection count
+ * but is included in total connection count if used.
+ */
+ if (primaryconnection)
+ {
+ regular_conn_count--;
+ }
if (force_autocommit)
need_tran = false;
+ else if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_COORDS)
+ need_tran = true;
else
need_tran = !autocommit || total_conn_count > 1;
+
if (!is_read_only)
{
if (primaryconnection)
- register_write_nodes(1, primaryconnection);
- register_write_nodes(regular_conn_count, connections);
+ register_write_nodes(1, &primaryconnection);
+ register_write_nodes(regular_conn_count, pgxc_connections->datanode_handles);
}
gxid = GetCurrentGlobalTransactionId();
if (!GlobalTransactionIdIsValid(gxid))
{
- if (primaryconnection)
- pfree(primaryconnection);
- pfree(connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to get next transaction ID")));
@@ -3226,46 +3346,61 @@ ExecRemoteUtility(RemoteQuery *node)
* Check if data node connections are in transaction and start
* transactions on nodes where it is not started
*/
- DataNodeHandle *new_connections[total_conn_count];
+ PGXCNodeHandle *new_connections[total_conn_count];
int new_count = 0;
- if (primaryconnection && primaryconnection[0]->transaction_status != 'T')
- new_connections[new_count++] = primaryconnection[0];
+ if (primaryconnection && primaryconnection->transaction_status != 'T')
+ new_connections[new_count++] = primaryconnection;
+
+ /* Check for Datanodes */
for (i = 0; i < regular_conn_count; i++)
- if (connections[i]->transaction_status != 'T')
- new_connections[new_count++] = connections[i];
+ if (pgxc_connections->datanode_handles[i]->transaction_status != 'T')
+ new_connections[new_count++] = pgxc_connections->datanode_handles[i];
- if (new_count && data_node_begin(new_count, new_connections, gxid))
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Could not begin transaction on data nodes")));
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_DATANODES)
+ {
+ if (new_count && pgxc_node_begin(new_count, new_connections, gxid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not begin transaction on data nodes")));
+ }
+ /* Check Coordinators also and begin there if necessary */
+ new_count = 0;
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_COORDS)
+ {
+ /* Important not to count the connection of local coordinator! */
+ for (i = 0; i < co_conn_count - 1; i++)
+ if (pgxc_connections->coord_handles[i]->transaction_status != 'T')
+ new_connections[new_count++] = pgxc_connections->coord_handles[i];
+
+ if (new_count && pgxc_node_begin(new_count, new_connections, gxid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not begin transaction on Coordinators")));
+ }
}
/* See if we have a primary nodes, execute on it first before the others */
if (primaryconnection)
{
/* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
+ if (!need_tran && pgxc_node_send_gxid(primaryconnection, gxid))
{
- pfree(connections);
- pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
+ if (snapshot && pgxc_node_send_snapshot(primaryconnection, snapshot))
{
- pfree(connections);
- pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
- if (data_node_send_query(primaryconnection[0], node->sql_statement) != 0)
+ if (pgxc_node_send_query(primaryconnection, node->sql_statement) != 0)
{
- pfree(connections);
- pfree(primaryconnection);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
@@ -3277,9 +3412,9 @@ ExecRemoteUtility(RemoteQuery *node)
{
PG_TRY();
{
- data_node_receive(1, primaryconnection, NULL);
- while (handle_response(primaryconnection[0], remotestate) == RESPONSE_EOF)
- data_node_receive(1, primaryconnection, NULL);
+ pgxc_node_receive(1, &primaryconnection, NULL);
+ while (handle_response(primaryconnection, remotestate) == RESPONSE_EOF)
+ pgxc_node_receive(1, &primaryconnection, NULL);
if (remotestate->errorMessage)
{
char *code = remotestate->errorCode;
@@ -3291,85 +3426,160 @@ ExecRemoteUtility(RemoteQuery *node)
/* If we got an error response return immediately */
PG_CATCH();
{
- pfree(primaryconnection);
- pfree(connections);
+ pfree_pgxc_all_handles(pgxc_connections);
+
PG_RE_THROW();
}
PG_END_TRY();
}
- pfree(primaryconnection);
}
- for (i = 0; i < regular_conn_count; i++)
+ /* Send query down to Datanodes */
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_DATANODES)
{
- /* If explicit transaction is needed gxid is already sent */
- if (!need_tran && data_node_send_gxid(connections[i], gxid))
- {
- pfree(connections);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (snapshot && data_node_send_snapshot(connections[i], snapshot))
- {
- pfree(connections);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
- }
- if (data_node_send_query(connections[i], node->sql_statement) != 0)
+ for (i = 0; i < regular_conn_count; i++)
{
- pfree(connections);
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to send command to data nodes")));
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && pgxc_node_send_gxid(pgxc_connections->datanode_handles[i], gxid))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (snapshot && pgxc_node_send_snapshot(pgxc_connections->datanode_handles[i], snapshot))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
+ if (pgxc_node_send_query(pgxc_connections->datanode_handles[i], node->sql_statement) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
}
}
- /*
- * Stop if all commands are completed or we got a data row and
- * initialized state node for subsequent invocations
- */
- while (regular_conn_count > 0)
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_COORDS)
{
- int i = 0;
-
- data_node_receive(regular_conn_count, connections, NULL);
- /*
- * Handle input from the data nodes.
- * If we got a RESPONSE_DATAROW we can break handling to wrap
- * it into a tuple and return. Handling will be continued upon
- * subsequent invocations.
- * If we got 0, we exclude connection from the list. We do not
- * expect more input from it. In case of non-SELECT query we quit
- * the loop when all nodes finish their work and send ReadyForQuery
- * with empty connections array.
- * If we got EOF, move to the next connection, will receive more
- * data on the next iteration.
- */
- while (i < regular_conn_count)
+ /* Now send it to Coordinators if necessary */
+ for (i = 0; i < co_conn_count - 1; i++)
{
- int res = handle_response(connections[i], remotestate);
- if (res == RESPONSE_EOF)
- {
- i++;
- }
- else if (res == RESPONSE_COMPLETE)
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && pgxc_node_send_gxid(pgxc_connections->coord_handles[i], gxid))
{
- if (i < --regular_conn_count)
- connections[i] = connections[regular_conn_count];
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
}
- else if (res == RESPONSE_TUPDESC)
+ if (snapshot && pgxc_node_send_snapshot(pgxc_connections->coord_handles[i], snapshot))
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Unexpected response from data node")));
+ errmsg("Failed to send command to data nodes")));
}
- else if (res == RESPONSE_DATAROW)
+ if (pgxc_node_send_query(pgxc_connections->coord_handles[i], node->sql_statement) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Unexpected response from data node")));
+ errmsg("Failed to send command to data nodes")));
+ }
+ }
+ }
+
+
+ /*
+ * Stop if all commands are completed or we got a data row and
+ * initialized state node for subsequent invocations
+ */
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_DATANODES)
+ {
+ while (regular_conn_count > 0)
+ {
+ int i = 0;
+
+ pgxc_node_receive(regular_conn_count, pgxc_connections->datanode_handles, NULL);
+ /*
+ * Handle input from the data nodes.
+ * If we got a RESPONSE_DATAROW we can break handling to wrap
+ * it into a tuple and return. Handling will be continued upon
+ * subsequent invocations.
+ * If we got 0, we exclude connection from the list. We do not
+ * expect more input from it. In case of non-SELECT query we quit
+ * the loop when all nodes finish their work and send ReadyForQuery
+ * with empty connections array.
+ * If we got EOF, move to the next connection, will receive more
+ * data on the next iteration.
+ */
+ while (i < regular_conn_count)
+ {
+ int res = handle_response(pgxc_connections->datanode_handles[i], remotestate);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --regular_conn_count)
+ pgxc_connections->datanode_handles[i] =
+ pgxc_connections->datanode_handles[regular_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from data node")));
+ }
+ }
+ }
+ }
+
+ /* Make the same for Coordinators */
+ if (exec_type == EXEC_ON_ALL_NODES ||
+ exec_type == EXEC_ON_COORDS)
+ {
+ /* For local Coordinator */
+ co_conn_count--;
+ while (co_conn_count > 0)
+ {
+ int i = 0;
+
+ pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL);
+ while (i < co_conn_count)
+ {
+ int res = handle_response(pgxc_connections->coord_handles[i], remotestate);
+ if (res == RESPONSE_EOF)
+ {
+ i++;
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (i < --co_conn_count)
+ pgxc_connections->coord_handles[i] =
+ pgxc_connections->coord_handles[co_conn_count];
+ }
+ else if (res == RESPONSE_TUPDESC)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from coordinator")));
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Unexpected response from coordinator")));
+ }
}
}
}
@@ -3380,12 +3590,12 @@ ExecRemoteUtility(RemoteQuery *node)
* Called when the backend is ending.
*/
void
-DataNodeCleanAndRelease(int code, Datum arg)
+PGXCNodeCleanAndRelease(int code, Datum arg)
{
/* Rollback on Data Nodes */
if (IsTransactionState())
{
- DataNodeRollback();
+ PGXCNodeRollback();
/* Rollback on GTM if transaction id opened. */
RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny());
@@ -3402,3 +3612,87 @@ DataNodeCleanAndRelease(int code, Datum arg)
stat_log();
}
+
+/*
+ * Create combiner, receive results from connections and validate combiner.
+ * Works for Coordinator or Datanodes.
+ */
+static int
+pgxc_node_receive_and_validate(const int conn_count, PGXCNodeHandle ** handles, bool reset_combiner)
+{
+ struct timeval *timeout = NULL;
+ int result = 0;
+ RemoteQueryState *combiner = NULL;
+
+ if (conn_count == 0)
+ return result;
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
+
+ /* Receive responses */
+ result = pgxc_node_receive_responses(conn_count, handles, timeout, combiner);
+ if (result)
+ goto finish;
+
+ if (reset_combiner)
+ result = ValidateAndResetCombiner(combiner) ? result : EOF;
+ else
+ result = ValidateAndCloseCombiner(combiner) ? result : EOF;
+
+finish:
+ return result;
+}
+
+/*
+ * Get all connections for which we have an open transaction,
+ * for both data nodes and coordinators
+ */
+static PGXCNodeAllHandles *
+pgxc_get_all_transaction_nodes()
+{
+ PGXCNodeAllHandles *pgxc_connections;
+
+ pgxc_connections = (PGXCNodeAllHandles *) palloc0(sizeof(PGXCNodeAllHandles));
+ if (!pgxc_connections)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ pgxc_connections->datanode_handles = (PGXCNodeHandle **)
+ palloc(NumDataNodes * sizeof(PGXCNodeHandle *));
+ pgxc_connections->coord_handles = (PGXCNodeHandle **)
+ palloc(NumCoords * sizeof(PGXCNodeHandle *));
+ if (!pgxc_connections->datanode_handles || !pgxc_connections->coord_handles)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ /* gather needed connections */
+ pgxc_connections->dn_conn_count = get_transaction_nodes(
+ pgxc_connections->datanode_handles, REMOTE_CONN_DATANODE);
+ pgxc_connections->co_conn_count = get_transaction_nodes(
+ pgxc_connections->coord_handles, REMOTE_CONN_COORD);
+
+ return pgxc_connections;
+}
+
+/* Free PGXCNodeAllHandles structure */
+static void
+pfree_pgxc_all_handles(PGXCNodeAllHandles *pgxc_handles)
+{
+ if (!pgxc_handles)
+ return;
+
+ if (pgxc_handles->primary_handle)
+ pfree(pgxc_handles->primary_handle);
+ if (pgxc_handles->datanode_handles && pgxc_handles->dn_conn_count != 0)
+ pfree(pgxc_handles->datanode_handles);
+ if (pgxc_handles->coord_handles && pgxc_handles->co_conn_count != 0)
+ pfree(pgxc_handles->coord_handles);
+
+ pfree(pgxc_handles);
+}
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/pgxcnode.c
index 2e8ec40c4c..5340a9397d 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -1,8 +1,9 @@
/*-------------------------------------------------------------------------
*
- * datanode.c
+ * pgxcnode.c
*
- * Functions for the coordinator communicating with the data nodes
+ * Functions for the Coordinator communicating with the PGXC nodes:
+ * Datanodes and Coordinators
*
*
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
@@ -27,7 +28,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "gtm/gtm_c.h"
-#include "pgxc/datanode.h"
+#include "pgxc/pgxcnode.h"
#include "pgxc/locator.h"
#include "pgxc/pgxc.h"
#include "pgxc/poolmgr.h"
@@ -38,18 +39,57 @@
#include "../interfaces/libpq/libpq-fe.h"
-static int node_count = 0;
-static DataNodeHandle *handles = NULL;
+static int datanode_count = 0;
+static int coord_count = 0;
+/*
+ * Datanode handles, saved in Transaction memory context when PostgresMain is launched
+ * Those handles are used inside a transaction by a coordinator to Datanodes
+ */
+static PGXCNodeHandle *dn_handles = NULL;
+/*
+ * Coordinator handles, saved in Transaction memory context
+ * when PostgresMain is launched.
+ * Those handles are used inside a transaction by a coordinator to other coordinators.
+ */
+static PGXCNodeHandle *co_handles = NULL;
-static void data_node_init(DataNodeHandle *handle, int sock, int nodenum);
-static void data_node_free(DataNodeHandle *handle);
+static void pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum);
+static void pgxc_node_free(PGXCNodeHandle *handle);
-static int get_int(DataNodeHandle * conn, size_t len, int *out);
-static int get_char(DataNodeHandle * conn, char *out);
+static int get_int(PGXCNodeHandle * conn, size_t len, int *out);
+static int get_char(PGXCNodeHandle * conn, char *out);
/*
- * Allocate and initialize memory to store DataNode handles.
+ * Initialize PGXCNodeHandle struct
+ */
+static void
+init_pgxc_handle(PGXCNodeHandle *pgxc_handle)
+{
+ /*
+ * Socket descriptor is small non-negative integer,
+ * Indicate the handle is not initialized yet
+ */
+ pgxc_handle->sock = NO_SOCKET;
+
+ /* Initialise buffers */
+ pgxc_handle->error = NULL;
+ pgxc_handle->outSize = 16 * 1024;
+ pgxc_handle->outBuffer = (char *) palloc(pgxc_handle->outSize);
+ pgxc_handle->inSize = 16 * 1024;
+ pgxc_handle->inBuffer = (char *) palloc(pgxc_handle->inSize);
+
+ if (pgxc_handle->outBuffer == NULL || pgxc_handle->inBuffer == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+}
+
+
+/*
+ * Allocate and initialize memory to store Datanode and Coordinator handles.
*/
void
InitMultinodeExecutor(void)
@@ -57,63 +97,64 @@ InitMultinodeExecutor(void)
int i;
/* This function could get called multiple times because of sigjmp */
- if (handles != NULL)
+ if (dn_handles != NULL && co_handles != NULL)
return;
/*
* Should be in TopMemoryContext.
* Assume the caller takes care of context switching
+ * Initialize Datanode handles.
*/
- handles = (DataNodeHandle *) palloc(NumDataNodes * sizeof(DataNodeHandle));
- if (!handles)
+ if (dn_handles == NULL)
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ dn_handles = (PGXCNodeHandle *) palloc(NumDataNodes * sizeof(PGXCNodeHandle));
+
+ if (!dn_handles)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+
+ /* initialize storage then */
+ for (i = 0; i < NumDataNodes; i++)
+ init_pgxc_handle(&dn_handles[i]);
}
- /* initialize storage then */
- for (i = 0; i < NumDataNodes; i++)
+ /* Same but for Coordinators */
+ if (co_handles == NULL)
{
- /*
- * Socket descriptor is small non-negative integer,
- * Indicate the handle is not initialized yet
- */
- handles[i].sock = NO_SOCKET;
+ co_handles = (PGXCNodeHandle *) palloc(NumCoords * sizeof(PGXCNodeHandle));
- /* Initialise buffers */
- handles[i].error = NULL;
- handles[i].outSize = 16 * 1024;
- handles[i].outBuffer = (char *) palloc(handles[i].outSize);
- handles[i].inSize = 16 * 1024;
- handles[i].inBuffer = (char *) palloc(handles[i].inSize);
-
- if (handles[i].outBuffer == NULL || handles[i].inBuffer == NULL)
- {
+ if (!co_handles)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
- }
+
+ for (i = 0; i < NumCoords; i++)
+ init_pgxc_handle(&co_handles[i]);
}
- node_count = 0;
+ datanode_count = 0;
+ coord_count = 0;
}
/*
* Builds up a connection string
*/
char *
-DataNodeConnStr(char *host, char *port, char *dbname,
- char *user, char *password)
+PGXCNodeConnStr(char *host, char *port, char *dbname,
+ char *user, char *password, char *remote_type)
{
char *out,
connstr[256];
int num;
- /* Build up connection string */
+ /*
+ * Build up connection string
+ * remote type can be coordinator, datanode or application.
+ */
num = snprintf(connstr, sizeof(connstr),
- "host=%s port=%s dbname=%s user=%s password=%s",
- host, port, dbname, user, password);
+ "host=%s port=%s dbname=%s user=%s password=%s options='-c remotetype=%s'",
+ host, port, dbname, user, password, remote_type);
/* Check for overflow */
if (num > 0 && num < sizeof(connstr))
@@ -133,7 +174,7 @@ DataNodeConnStr(char *host, char *port, char *dbname,
* Connect to a Data Node using a connection string
*/
NODE_CONNECTION *
-DataNodeConnect(char *connstr)
+PGXCNodeConnect(char *connstr)
{
PGconn *conn;
@@ -147,7 +188,7 @@ DataNodeConnect(char *connstr)
* Close specified connection
*/
void
-DataNodeClose(NODE_CONNECTION *conn)
+PGXCNodeClose(NODE_CONNECTION *conn)
{
/* Delegate call to the pglib */
PQfinish((PGconn *) conn);
@@ -158,7 +199,7 @@ DataNodeClose(NODE_CONNECTION *conn)
* Checks if connection active
*/
int
-DataNodeConnected(NODE_CONNECTION *conn)
+PGXCNodeConnected(NODE_CONNECTION *conn)
{
/* Delegate call to the pglib */
PGconn *pgconn = (PGconn *) conn;
@@ -179,7 +220,7 @@ DataNodeConnected(NODE_CONNECTION *conn)
* is destroyed in xact.c.
*/
static void
-data_node_free(DataNodeHandle *handle)
+pgxc_node_free(PGXCNodeHandle *handle)
{
close(handle->sock);
handle->sock = NO_SOCKET;
@@ -192,7 +233,7 @@ data_node_free(DataNodeHandle *handle)
* Structure stores state info and I/O buffers
*/
static void
-data_node_init(DataNodeHandle *handle, int sock, int nodenum)
+pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum)
{
handle->nodenum = nodenum;
handle->sock = sock;
@@ -214,8 +255,8 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum)
* the data into the buffer
*/
int
-data_node_receive(const int conn_count,
- DataNodeHandle ** connections, struct timeval * timeout)
+pgxc_node_receive(const int conn_count,
+ PGXCNodeHandle ** connections, struct timeval * timeout)
{
int i,
res_select,
@@ -269,11 +310,11 @@ retry:
/* read data */
for (i = 0; i < conn_count; i++)
{
- DataNodeHandle *conn = connections[i];
+ PGXCNodeHandle *conn = connections[i];
if (FD_ISSET(conn->sock, &readfds))
{
- int read_status = data_node_read_data(conn);
+ int read_status = pgxc_node_read_data(conn);
if (read_status == EOF || read_status < 0)
{
@@ -293,10 +334,10 @@ retry:
/*
- * Read up incoming messages from the Data ndoe connection
+ * Read up incoming messages from the PGXC node connection
*/
int
-data_node_read_data(DataNodeHandle *conn)
+pgxc_node_read_data(PGXCNodeHandle *conn)
{
int someread = 0;
int nread;
@@ -434,18 +475,18 @@ retry:
* Throw away any data.
*/
void
-clear_socket_data (DataNodeHandle *conn)
+clear_socket_data (PGXCNodeHandle *conn)
{
do {
conn->inStart = conn->inCursor = conn->inEnd = 0;
- } while (data_node_read_data(conn) > 0);
+ } while (pgxc_node_read_data(conn) > 0);
}
/*
* Get one character from the connection buffer and advance cursor
*/
static int
-get_char(DataNodeHandle * conn, char *out)
+get_char(PGXCNodeHandle * conn, char *out)
{
if (conn->inCursor < conn->inEnd)
{
@@ -459,7 +500,7 @@ get_char(DataNodeHandle * conn, char *out)
* Read an integer from the connection buffer and advance cursor
*/
static int
-get_int(DataNodeHandle *conn, size_t len, int *out)
+get_int(PGXCNodeHandle *conn, size_t len, int *out)
{
unsigned short tmp2;
unsigned int tmp4;
@@ -503,7 +544,7 @@ get_int(DataNodeHandle *conn, size_t len, int *out)
* it should make a copy.
*/
char
-get_message(DataNodeHandle *conn, int *len, char **msg)
+get_message(PGXCNodeHandle *conn, int *len, char **msg)
{
char msgtype;
@@ -542,7 +583,8 @@ get_message(DataNodeHandle *conn, int *len, char **msg)
/*
- * Release all data node connections back to pool and release occupied memory
+ * Release all data node connections and coordinator connections
+ * back to pool and release occupied memory
*
* If force_drop is true, we force dropping all of the connections, such as after
* a rollback, which was likely issued due to an error.
@@ -551,31 +593,55 @@ void
release_handles(bool force_drop)
{
int i;
- int discard[NumDataNodes];
- int ndisc = 0;
+ int dn_discard[NumDataNodes];
+ int co_discard[NumCoords];
+ int dn_ndisc = 0;
+ int co_ndisc = 0;
-
- if (node_count == 0)
+ if (datanode_count == 0 && coord_count == 0)
return;
+ /* Collect Data Nodes handles */
for (i = 0; i < NumDataNodes; i++)
{
- DataNodeHandle *handle = &handles[i];
+ PGXCNodeHandle *handle = &dn_handles[i];
if (handle->sock != NO_SOCKET)
{
if (force_drop)
- discard[ndisc++] = handle->nodenum;
+ dn_discard[dn_ndisc++] = handle->nodenum;
else if (handle->state != DN_CONNECTION_STATE_IDLE)
{
- elog(WARNING, "Connection to data node %d has unexpected state %d and will be dropped", handle->nodenum, handle->state);
- discard[ndisc++] = handle->nodenum;
+ elog(WARNING, "Connection to Datanode %d has unexpected state %d and will be dropped", handle->nodenum, handle->state);
+ dn_discard[dn_ndisc++] = handle->nodenum;
}
- data_node_free(handle);
+ pgxc_node_free(handle);
}
}
- PoolManagerReleaseConnections(ndisc, discard);
- node_count = 0;
+
+ /* Collect Coordinator handles */
+ for (i = 0; i < NumCoords; i++)
+ {
+ PGXCNodeHandle *handle = &co_handles[i];
+
+ if (handle->sock != NO_SOCKET)
+ {
+ if (force_drop)
+ co_discard[co_ndisc++] = handle->nodenum;
+ else if (handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ elog(WARNING, "Connection to Coordinator %d has unexpected state %d and will be dropped", handle->nodenum, handle->state);
+ co_discard[co_ndisc++] = handle->nodenum;
+ }
+ pgxc_node_free(handle);
+ }
+ }
+
+ /* Here We have to add also the list of Coordinator Connections we want to drop at the same time */
+ PoolManagerReleaseConnections(dn_ndisc, dn_discard, co_ndisc, co_discard);
+
+ datanode_count = 0;
+ coord_count = 0;
}
@@ -584,7 +650,7 @@ release_handles(bool force_drop)
* increase it if necessary
*/
int
-ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
+ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle *handle)
{
int newsize = handle->inSize;
char *newbuf;
@@ -636,7 +702,7 @@ ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
* increase it if necessary
*/
int
-ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
+ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle *handle)
{
int newsize = handle->outSize;
char *newbuf;
@@ -687,7 +753,7 @@ ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle)
* Send specified amount of data from the outgoing buffer over the connection
*/
int
-send_some(DataNodeHandle *handle, int len)
+send_some(PGXCNodeHandle *handle, int len)
{
char *ptr = handle->outBuffer;
int remaining = handle->outEnd;
@@ -787,7 +853,7 @@ send_some(DataNodeHandle *handle, int len)
* To ensure all data are on the wire before waiting for response
*/
int
-data_node_flush(DataNodeHandle *handle)
+pgxc_node_flush(PGXCNodeHandle *handle)
{
while (handle->outEnd)
{
@@ -801,10 +867,10 @@ data_node_flush(DataNodeHandle *handle)
}
/*
- * Send specified statement down to the Data node
+ * Send specified statement down to the PGXC node
*/
int
-data_node_send_query(DataNodeHandle * handle, const char *query)
+pgxc_node_send_query(PGXCNodeHandle * handle, const char *query)
{
int strLen = strlen(query) + 1;
@@ -827,15 +893,15 @@ data_node_send_query(DataNodeHandle * handle, const char *query)
handle->state = DN_CONNECTION_STATE_QUERY;
- return data_node_flush(handle);
+ return pgxc_node_flush(handle);
}
/*
- * Send the GXID down to the Data node
+ * Send the GXID down to the PGXC node
*/
int
-data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid)
+pgxc_node_send_gxid(PGXCNodeHandle *handle, GlobalTransactionId gxid)
{
int msglen = 8;
int i32;
@@ -860,10 +926,10 @@ data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid)
/*
- * Send the snapshot down to the Data node
+ * Send the snapshot down to the PGXC node
*/
int
-data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot)
+pgxc_node_send_snapshot(PGXCNodeHandle *handle, Snapshot snapshot)
{
int msglen;
int nval;
@@ -913,10 +979,10 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot)
}
/*
- * Send the timestamp down to the Datanode
+ * Send the timestamp down to the PGXC node
*/
int
-data_node_send_timestamp(DataNodeHandle *handle, TimestampTz timestamp)
+pgxc_node_send_timestamp(PGXCNodeHandle *handle, TimestampTz timestamp)
{
int msglen = 12; /* 4 bytes for msglen and 8 bytes for timestamp (int64) */
uint32 n32;
@@ -959,7 +1025,7 @@ data_node_send_timestamp(DataNodeHandle *handle, TimestampTz timestamp)
* at the convenient time
*/
void
-add_error_message(DataNodeHandle *handle, const char *message)
+add_error_message(PGXCNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
@@ -972,18 +1038,21 @@ add_error_message(DataNodeHandle *handle, const char *message)
}
/*
- * for specified list return array of DataNodeHandles
+ * for specified list return array of PGXCNodeHandles
* acquire from pool if needed.
* the lenth of returned array is the same as of nodelist
- * Special case is empty or NIL nodeList, in this case return all the nodes.
+ * For Datanodes, Special case is empty or NIL nodeList, in this case return all the nodes.
* The returned list should be pfree'd when no longer needed.
+ * For Coordinator, do not get a connection if Coordinator list is NIL,
+ * Coordinator fds is returned only if transaction uses a DDL
*/
-DataNodeHandle **
-get_handles(List *nodelist)
+PGXCNodeAllHandles *
+get_handles(List *datanodelist, List *coordlist, bool is_coord_only_query)
{
- DataNodeHandle **result;
+ PGXCNodeAllHandles *result;
ListCell *node_list_item;
- List *allocate = NIL;
+ List *dn_allocate = NIL;
+ List *co_allocate = NIL;
MemoryContext old_context;
/* index of the result array */
@@ -991,76 +1060,194 @@ get_handles(List *nodelist)
/* Handles should be there while transaction lasts */
old_context = MemoryContextSwitchTo(TopTransactionContext);
- /* If node list is empty execute request on current nodes */
- if (list_length(nodelist) == 0)
+
+ result = (PGXCNodeAllHandles *) palloc(sizeof(PGXCNodeAllHandles));
+ if (!result)
{
- /*
- * We do not have to zero the array - on success all items will be set
- * to correct pointers, on error the array will be freed
- */
- result = (DataNodeHandle **) palloc(NumDataNodes * sizeof(DataNodeHandle *));
- if (!result)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ result->primary_handle = NULL;
+ result->datanode_handles = NULL;
+ result->coord_handles = NULL;
+ result->co_conn_count = list_length(coordlist);
+ result->dn_conn_count = list_length(datanodelist);
+
+ /*
+ * Get Handles for Datanodes
+ * If node list is empty execute request on current nodes.
+ * It is also possible that the query has to be launched only on Coordinators.
+ */
+ if (!is_coord_only_query)
+ {
+ if (list_length(datanodelist) == 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
+ /*
+ * We do not have to zero the array - on success all items will be set
+ * to correct pointers, on error the array will be freed
+ */
+ result->datanode_handles = (PGXCNodeHandle **)
+ palloc(NumDataNodes * sizeof(PGXCNodeHandle *));
+ if (!result->datanode_handles)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
- for (i = 0; i < NumDataNodes; i++)
+ for (i = 0; i < NumDataNodes; i++)
+ {
+ result->datanode_handles[i] = &dn_handles[i];
+ if (dn_handles[i].sock == NO_SOCKET)
+ dn_allocate = lappend_int(dn_allocate, i + 1);
+ }
+ }
+ else
{
- result[i] = &handles[i];
- if (handles[i].sock == NO_SOCKET)
- allocate = lappend_int(allocate, i + 1);
+ /*
+ * We do not have to zero the array - on success all items will be set
+ * to correct pointers, on error the array will be freed
+ */
+ result->datanode_handles = (PGXCNodeHandle **)
+ palloc(list_length(datanodelist) * sizeof(PGXCNodeHandle *));
+ if (!result->datanode_handles)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ i = 0;
+ foreach(node_list_item, datanodelist)
+ {
+ int node = lfirst_int(node_list_item);
+
+ Assert(node > 0 && node <= NumDataNodes);
+
+ result->datanode_handles[i++] = &dn_handles[node - 1];
+ if (dn_handles[node - 1].sock == NO_SOCKET)
+ dn_allocate = lappend_int(dn_allocate, node);
+ }
}
}
- else
+
+ /*
+ * Get Handles for Coordinators
+ * If node list is empty execute request on current nodes
+ * There are transactions where the coordinator list is NULL Ex:COPY
+ */
+ if (coordlist)
{
- /*
- * We do not have to zero the array - on success all items will be set
- * to correct pointers, on error the array will be freed
- */
- result = (DataNodeHandle **) palloc(list_length(nodelist) * sizeof(DataNodeHandle *));
- if (!result)
+ if (list_length(coordlist) == 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
+ /*
+ * We do not have to zero the array - on success all items will be set
+ * to correct pointers, on error the array will be freed
+ */
+ result->coord_handles = (PGXCNodeHandle **)
+ palloc(NumCoords * sizeof(PGXCNodeHandle *));
+ if (!result->coord_handles)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
- i = 0;
- foreach(node_list_item, nodelist)
+ for (i = 0; i < NumCoords; i++)
+ {
+ result->coord_handles[i] = &co_handles[i];
+ if (co_handles[i].sock == NO_SOCKET)
+ co_allocate = lappend_int(co_allocate, i + 1);
+ }
+ }
+ else
{
- int node = lfirst_int(node_list_item);
+ /*
+ * We do not have to zero the array - on success all items will be set
+ * to correct pointers, on error the array will be freed
+ */
+ result->coord_handles = (PGXCNodeHandle **)
+ palloc(list_length(coordlist) * sizeof(PGXCNodeHandle *));
+ if (!result->coord_handles)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ i = 0;
+ /* Some transactions do not need Coordinators, ex: COPY */
+ foreach(node_list_item, coordlist)
+ {
+ int node = lfirst_int(node_list_item);
- Assert(node > 0 && node <= NumDataNodes);
+ Assert(node > 0 && node <= NumCoords);
- result[i++] = &handles[node - 1];
- if (handles[node - 1].sock == NO_SOCKET)
- allocate = lappend_int(allocate, node);
+ result->coord_handles[i++] = &co_handles[node - 1];
+ if (co_handles[node - 1].sock == NO_SOCKET)
+ co_allocate = lappend_int(co_allocate, node);
+ }
}
}
- if (allocate)
+ /*
+ * Pooler can get activated even if list of Coordinator or Datanode is NULL
+ * If both lists are NIL, we don't need to call Pooler.
+ */
+ if (dn_allocate || co_allocate)
{
int j = 0;
- int *fds = PoolManagerGetConnections(allocate);
+ int *fds = PoolManagerGetConnections(dn_allocate, co_allocate);
if (!fds)
{
+ if (coordlist)
+ if (result->coord_handles)
+ pfree(result->coord_handles);
+ if (datanodelist)
+ if (result->datanode_handles)
+ pfree(result->datanode_handles);
+
pfree(result);
- list_free(allocate);
+ if (dn_allocate)
+ list_free(dn_allocate);
+ if (co_allocate)
+ list_free(co_allocate);
return NULL;
}
- foreach(node_list_item, allocate)
+ /* Initialisation for Datanodes */
+ if (dn_allocate)
{
- int node = lfirst_int(node_list_item);
- int fdsock = fds[j++];
+ foreach(node_list_item, dn_allocate)
+ {
+ int node = lfirst_int(node_list_item);
+ int fdsock = fds[j++];
- data_node_init(&handles[node - 1], fdsock, node);
- node_count++;
+ pgxc_node_init(&dn_handles[node - 1], fdsock, node);
+ datanode_count++;
+ }
+ }
+ /* Initialisation for Coordinators */
+ if (co_allocate)
+ {
+ foreach(node_list_item, co_allocate)
+ {
+ int node = lfirst_int(node_list_item);
+ int fdsock = fds[j++];
+
+ pgxc_node_init(&co_handles[node - 1], fdsock, node);
+ coord_count++;
+ }
}
+
pfree(fds);
- list_free(allocate);
+
+ if (co_allocate)
+ list_free(co_allocate);
+ if (dn_allocate)
+ list_free(dn_allocate);
}
/* restore context */
@@ -1073,20 +1260,22 @@ get_handles(List *nodelist)
/*
* Return handles involved into current transaction, to run commit or rollback
* on them, as requested.
+ * Depending on the connection type, Coordinator or Datanode connections are returned.
+ *
* Transaction is not started on nodes when read-only statement is executed
* on it, so we do not have to commit or rollback on those nodes.
- * Parameter should point to array able to store at least node_count pointers
- * to a DataNodeHandle structure.
+ * Parameter should point to array able to store at least datanode_count pointers
+ * to a PGXCNodeHandle structure.
* The function returns number of pointers written to the connections array.
* Remaining items in the array, if any, will be kept unchanged
*/
int
-get_transaction_nodes(DataNodeHandle **connections)
+get_transaction_nodes(PGXCNodeHandle **connections, char client_conn_type)
{
int tran_count = 0;
int i;
- if (node_count)
+ if (datanode_count && client_conn_type == REMOTE_CONN_DATANODE)
{
for (i = 0; i < NumDataNodes; i++)
{
@@ -1096,8 +1285,16 @@ get_transaction_nodes(DataNodeHandle **connections)
* DN_CONNECTION_STATE_ERROR_FATAL.
* ERROR_NOT_READY can happen if the data node abruptly disconnects.
*/
- if (handles[i].sock != NO_SOCKET && handles[i].transaction_status != 'I')
- connections[tran_count++] = &handles[i];
+ if (dn_handles[i].sock != NO_SOCKET && dn_handles[i].transaction_status != 'I')
+ connections[tran_count++] = &dn_handles[i];
+ }
+ }
+ if (coord_count && client_conn_type == REMOTE_CONN_COORD)
+ {
+ for (i = 0; i < NumCoords; i++)
+ {
+ if (co_handles[i].sock != NO_SOCKET && co_handles[i].transaction_status != 'I')
+ connections[tran_count++] = &co_handles[i];
}
}
@@ -1105,22 +1302,48 @@ get_transaction_nodes(DataNodeHandle **connections)
}
/*
- * Collect node numbers for the given Datanode connections
+ * Collect node numbers for the given Datanode and Coordinator connections
* and return it for prepared transactions
*/
PGXC_NodeId*
-collect_datanode_numbers(int conn_count, DataNodeHandle **connections)
+collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle **connections, char client_conn_type)
{
- PGXC_NodeId *datanodes = NULL;
+ PGXC_NodeId *pgxcnodes = NULL;
int i;
- datanodes = (PGXC_NodeId *) palloc(conn_count * sizeof(PGXC_NodeId));
+
+ /* It is also necessary to save in GTM the local Coordinator that is being prepared */
+ if (client_conn_type == REMOTE_CONN_COORD)
+ pgxcnodes = (PGXC_NodeId *) palloc((conn_count + 1) * sizeof(PGXC_NodeId));
+ else
+ pgxcnodes = (PGXC_NodeId *) palloc(conn_count * sizeof(PGXC_NodeId));
+
+ if (!pgxcnodes)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
for (i = 0; i < conn_count; i++)
{
- datanodes[i] = connections[i]->nodenum;
+ pgxcnodes[i] = connections[i]->nodenum;
}
- return datanodes;
+ /* Save here the Coordinator number where we are */
+ if (client_conn_type == REMOTE_CONN_COORD)
+ pgxcnodes[coord_count] = PGXCNodeId;
+
+ return pgxcnodes;
+}
+
+/* Determine if the connection is active */
+static bool
+is_active_connection(PGXCNodeHandle *handle)
+{
+ return handle->sock != NO_SOCKET &&
+ handle->state != DN_CONNECTION_STATE_IDLE &&
+ handle->state != DN_CONNECTION_STATE_ERROR_NOT_READY &&
+ handle->state != DN_CONNECTION_STATE_ERROR_FATAL;
}
/*
@@ -1128,23 +1351,111 @@ collect_datanode_numbers(int conn_count, DataNodeHandle **connections)
* have data to consume on them.
*/
int
-get_active_nodes (DataNodeHandle **connections)
+get_active_nodes(PGXCNodeHandle **connections)
{
int active_count = 0;
int i;
- if (node_count)
+ if (datanode_count)
{
for (i = 0; i < NumDataNodes; i++)
{
- if (handles[i].sock != NO_SOCKET &&
- handles[i].state != DN_CONNECTION_STATE_IDLE &&
- handles[i].state != DN_CONNECTION_STATE_ERROR_NOT_READY &&
- handles[i].state != DN_CONNECTION_STATE_ERROR_FATAL)
- connections[active_count++] = &handles[i];
+ if (is_active_connection(&dn_handles[i]))
+ connections[active_count++] = &dn_handles[i];
}
}
+ if (coord_count)
+ {
+ for (i = 0; i < NumCoords; i++)
+ {
+ if (is_active_connection(&co_handles[i]))
+ connections[active_count++] = &co_handles[i];
+ }
+ }
+
return active_count;
}
+/*
+ * Send gxid to all the handles except primary connection (treated separatly)
+ */
+int
+pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error)
+{
+ int dn_conn_count = pgxc_handles->dn_conn_count;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int i;
+ int result = 0;
+
+ if (pgxc_handles->primary_handle)
+ dn_conn_count--;
+
+ /* Send GXID to Datanodes */
+ for (i = 0; i < dn_conn_count; i++)
+ {
+ if (pgxc_node_send_gxid(pgxc_handles->datanode_handles[i], gxid))
+ {
+ add_error_message(pgxc_handles->datanode_handles[i], "Can not send request");
+ result = EOF;
+ if (stop_at_error)
+ goto finish;
+ }
+ }
+
+ /* Send GXID to Coordinators handles */
+ for (i = 0; i < co_conn_count; i++)
+ {
+ if (pgxc_node_send_gxid(pgxc_handles->coord_handles[i], gxid))
+ {
+ add_error_message(pgxc_handles->coord_handles[i], "Can not send request");
+ result = EOF;
+ if (stop_at_error)
+ goto finish;
+ }
+ }
+
+finish:
+ return result;
+}
+
+/*
+ * Send query to all the handles except primary connection (treated separatly)
+ */
+int
+pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error)
+{
+ int dn_conn_count = pgxc_handles->dn_conn_count;
+ int co_conn_count = pgxc_handles->co_conn_count;
+ int i;
+ int result = 0;
+
+ if (pgxc_handles->primary_handle)
+ dn_conn_count--;
+
+ /* Send to Datanodes */
+ for (i = 0; i < dn_conn_count; i++)
+ {
+ if (pgxc_node_send_query(pgxc_handles->datanode_handles[i], buffer))
+ {
+ add_error_message(pgxc_handles->datanode_handles[i], "Can not send request");
+ result = EOF;
+ if (stop_at_error)
+ goto finish;
+ }
+ }
+ /* Send to Coordinators */
+ for (i = 0; i < co_conn_count; i++)
+ {
+ if (pgxc_node_send_query(pgxc_handles->coord_handles[i], buffer))
+ {
+ add_error_message(pgxc_handles->coord_handles[i], "Can not send request");
+ result = EOF;
+ if (stop_at_error)
+ goto finish;
+ }
+ }
+
+finish:
+ return result;
+}
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index dbb8aed3d4..7ebced89f9 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -44,6 +44,7 @@
#include "lib/stringinfo.h"
#include "libpq/pqformat.h"
#include "pgxc/locator.h"
+#include "pgxc/pgxc.h"
#include "../interfaces/libpq/libpq-fe.h"
#include "postmaster/postmaster.h" /* For UnixSocketDir */
#include <stdlib.h>
@@ -53,6 +54,7 @@
/* Configuration options */
int NumDataNodes = 2;
+int NumCoords = 1;
int MinPoolSize = 1;
int MaxPoolSize = 100;
int PoolerPort = 6667;
@@ -62,14 +64,21 @@ bool PersistentConnections = false;
/* The memory context */
static MemoryContext PoolerMemoryContext = NULL;
-/* Connection info */
+/* Connection info of Datanodes */
char *DataNodeHosts = NULL;
char *DataNodePorts = NULL;
char *DataNodeUsers = NULL;
char *DataNodePwds = NULL;
-/* Connection info list */
-static DataNodeConnectionInfo *connectionInfos;
+/* Connection info of Coordinators */
+char *CoordinatorHosts = NULL;
+char *CoordinatorPorts = NULL;
+char *CoordinatorUsers = NULL;
+char *CoordinatorPwds = NULL;
+
+/* PGXC Nodes info list */
+static PGXCNodeConnectionInfo *datanode_connInfos;
+static PGXCNodeConnectionInfo *coord_connInfos;
/* Pool to all the databases (linked list) */
static DatabasePool *databasePools = NULL;
@@ -82,22 +91,23 @@ static PoolHandle *Handle = NULL;
static int server_fd = -1;
-static void agent_init(PoolAgent *agent, const char *database, List *nodes);
+static void agent_init(PoolAgent *agent, const char *database);
static void agent_destroy(PoolAgent *agent);
static void agent_create(void);
static void agent_handle_input(PoolAgent *agent, StringInfo s);
-static DatabasePool *create_database_pool(const char *database, List *nodes);
+static DatabasePool *create_database_pool(const char *database);
static void insert_database_pool(DatabasePool *pool);
static int destroy_database_pool(const char *database);
static DatabasePool *find_database_pool(const char *database);
static DatabasePool *remove_database_pool(const char *database);
-static int *agent_acquire_connections(PoolAgent *agent, List *nodelist);
-static DataNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node);
-static void agent_release_connections(PoolAgent *agent, List *discard);
-static void release_connection(DatabasePool *dbPool, DataNodePoolSlot *slot, int index, bool clean);
-static void destroy_slot(DataNodePoolSlot *slot);
-static void grow_pool(DatabasePool *dbPool, int index);
-static void destroy_node_pool(DataNodePool *node_pool);
+static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
+static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type);
+static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard);
+static void release_connection(DatabasePool *dbPool, PGXCNodePoolSlot *slot, int index, bool clean,
+ char client_conn_type);
+static void destroy_slot(PGXCNodePoolSlot *slot);
+static void grow_pool(DatabasePool *dbPool, int index, char client_conn_type);
+static void destroy_node_pool(PGXCNodePool *node_pool);
static void PoolerLoop(void);
/* Signal handlers */
@@ -122,7 +132,7 @@ PoolManagerInit()
char *rawstring;
List *elemlist;
ListCell *l;
- int i;
+ int i, count;
MemoryContext old_context;
elog(DEBUG1, "Pooler process is started: %d", getpid());
@@ -178,255 +188,325 @@ PoolManagerInit()
errmsg("out of memory")));
}
- connectionInfos = (DataNodeConnectionInfo *) palloc(NumDataNodes * sizeof(DataNodeConnectionInfo));
- if (connectionInfos == NULL)
+ datanode_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumDataNodes * sizeof(PGXCNodeConnectionInfo));
+ coord_connInfos = (PGXCNodeConnectionInfo *)
+ palloc(NumCoords * sizeof(PGXCNodeConnectionInfo));
+ if (coord_connInfos == NULL
+ || datanode_connInfos == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
- /* Need a modifiable copy */
- rawstring = pstrdup(DataNodeHosts);
-
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_hosts\"")));
- }
-
- i = 0;
- foreach(l, elemlist)
+ /* Parse Host/Port/Password/User data for Coordinators and Datanodes */
+ for (count = 0; count < 2; count++)
{
- char *curhost = (char *) lfirst(l);
+ PGXCNodeConnectionInfo *connectionInfos;
+ int num_nodes;
+ if (count == 0)
+ {
+ /* Need a modifiable copy */
+ rawstring = pstrdup(DataNodeHosts);
+ connectionInfos = datanode_connInfos;
+ num_nodes = NumDataNodes;
+ }
+ else
+ {
+ /* Need a modifiable copy */
+ rawstring = pstrdup(CoordinatorHosts);
+ connectionInfos = coord_connInfos;
+ num_nodes = NumCoords;
+ }
- connectionInfos[i].host = pstrdup(curhost);
- if (connectionInfos[i].host == NULL)
+ /* Do that for Coordinator and Datanode strings */
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_hosts\"")));
}
- /* Ignore extra entries, if any */
- if (++i == NumDataNodes)
- break;
- }
- list_free(elemlist);
- pfree(rawstring);
- /* Validate */
- if (i == 0)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_hosts\"")));
- }
- else if (i == 1)
- {
- /* Copy all values from first */
- for (; i < NumDataNodes; i++)
+ i = 0;
+ foreach(l, elemlist)
{
- connectionInfos[i].host = pstrdup(connectionInfos[0].host);
+ char *curhost = (char *) lfirst(l);
+
+ connectionInfos[i].host = pstrdup(curhost);
if (connectionInfos[i].host == NULL)
{
ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
}
+ /* Ignore extra entries, if any */
+ if (++i == num_nodes)
+ break;
}
- }
- else if (i < NumDataNodes)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_hosts\"")));
- }
+ list_free(elemlist);
+ pfree(rawstring);
- /* Need a modifiable copy */
- rawstring = pstrdup(DataNodePorts);
-
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_ports\"")));
- }
+ /* Validate */
+ if (i == 0)
+ {
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_hosts\"")));
+ }
+ else if (i == 1)
+ {
+ /* Copy all values from first */
+ for (; i < num_nodes; i++)
+ {
+ connectionInfos[i].host = pstrdup(connectionInfos[0].host);
+ if (connectionInfos[i].host == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ }
+ }
+ else if (i < num_nodes)
+ {
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_hosts\"")));
+ }
- i = 0;
- foreach(l, elemlist)
- {
- char *curport = (char *) lfirst(l);
+ /* Parse port data for Coordinators and Datanodes */
+ /* Need a modifiable copy */
+ if (count == 0)
+ rawstring = pstrdup(DataNodePorts);
+ if (count == 1)
+ rawstring = pstrdup(CoordinatorPorts);
- connectionInfos[i].port = pstrdup(curport);
- if (connectionInfos[i].port == NULL)
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_ports\"")));
}
- /* Ignore extra entries, if any */
- if (++i == NumDataNodes)
- break;
- }
- list_free(elemlist);
- pfree(rawstring);
- /* Validate */
- if (i == 0)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_ports\"")));
- }
- else if (i == 1)
- {
- /* Copy all values from first */
- for (; i < NumDataNodes; i++)
+ i = 0;
+ foreach(l, elemlist)
{
- connectionInfos[i].port = pstrdup(connectionInfos[0].port);
+ char *curport = (char *) lfirst(l);
+
+ connectionInfos[i].port = pstrdup(curport);
if (connectionInfos[i].port == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ /* Ignore extra entries, if any */
+ if (++i == num_nodes)
+ break;
}
- }
- else if (i < NumDataNodes)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_ports\"")));
- }
-
- rawstring = pstrdup(DataNodeUsers);
+ list_free(elemlist);
+ pfree(rawstring);
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_users\"")));
- }
+ /* Validate */
+ if (i == 0)
+ {
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_ports\"")));
+ }
+ else if (i == 1)
+ {
+ /* Copy all values from first */
+ for (; i < num_nodes; i++)
+ {
+ connectionInfos[i].port = pstrdup(connectionInfos[0].port);
+ if (connectionInfos[i].port == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ }
+ }
+ else if (i < num_nodes)
+ {
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_ports\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_ports\"")));
+ }
- i = 0;
- foreach(l, elemlist)
- {
- char *curuser = (char *) lfirst(l);
+ if (count == 0)
+ rawstring = pstrdup(DataNodeUsers);
+ if (count == 1)
+ rawstring = pstrdup(CoordinatorUsers);
- connectionInfos[i].uname = pstrdup(curuser);
- if (connectionInfos[i].uname == NULL)
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_users\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_users\"")));
}
- /* Ignore extra entries, if any */
- if (++i == NumDataNodes)
- break;
- }
- list_free(elemlist);
- pfree(rawstring);
- /* Validate */
- if (i == 0)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_users\"")));
- }
- else if (i == 1)
- {
- /* Copy all values from first */
- for (; i < NumDataNodes; i++)
+ i = 0;
+ foreach(l, elemlist)
{
- connectionInfos[i].uname = pstrdup(connectionInfos[0].uname);
+ char *curuser = (char *) lfirst(l);
+
+ connectionInfos[i].uname = pstrdup(curuser);
if (connectionInfos[i].uname == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ /* Ignore extra entries, if any */
+ if (++i == num_nodes)
+ break;
}
- }
- else if (i < NumDataNodes)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_users\"")));
- }
+ list_free(elemlist);
+ pfree(rawstring);
- rawstring = pstrdup(DataNodePwds);
-
- /* Parse string into list of identifiers */
- if (!SplitIdentifierString(rawstring, ',', &elemlist))
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_passwords\"")));
- }
+ /* Validate */
+ if (i == 0)
+ {
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_users\"")));
+ else
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_users\"")));
+ }
+ else if (i == 1)
+ {
+ /* Copy all values from first */
+ for (; i < num_nodes; i++)
+ {
+ connectionInfos[i].uname = pstrdup(connectionInfos[0].uname);
+ if (connectionInfos[i].uname == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ }
+ }
+ else if (i < num_nodes)
+ {
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_users\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_users\"")));
+ }
- i = 0;
- foreach(l, elemlist)
- {
- char *curpassword = (char *) lfirst(l);
+ if (count == 0)
+ rawstring = pstrdup(DataNodePwds);
+ if (count == 1)
+ rawstring = pstrdup(CoordinatorPwds);
- connectionInfos[i].password = pstrdup(curpassword);
- if (connectionInfos[i].password == NULL)
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
{
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_passwords\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_passwords\"")));
}
- /* Ignore extra entries, if any */
- if (++i == NumDataNodes)
- break;
- }
- list_free(elemlist);
- pfree(rawstring);
- /* Validate */
- if (i == 0)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_passwords\"")));
- }
- else if (i == 1)
- {
- /* Copy all values from first */
- for (; i < NumDataNodes; i++)
+ i = 0;
+ foreach(l, elemlist)
{
- connectionInfos[i].password = pstrdup(connectionInfos[0].password);
+ char *curpassword = (char *) lfirst(l);
+
+ connectionInfos[i].password = pstrdup(curpassword);
if (connectionInfos[i].password == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
+ /* Ignore extra entries, if any */
+ if (++i == num_nodes)
+ break;
+ }
+ list_free(elemlist);
+ pfree(rawstring);
+
+ /* Validate */
+ if (i == 0)
+ {
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_passwords\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_passwords\"")));
+ }
+ else if (i == 1)
+ {
+ /* Copy all values from first */
+ for (; i < num_nodes; i++)
+ {
+ connectionInfos[i].password = pstrdup(connectionInfos[0].password);
+ if (connectionInfos[i].password == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ }
+ }
+ else if (i < num_nodes)
+ {
+ if (count == 0)
+ /* syntax error in list */
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"data_node_passwords\"")));
+ else
+ ereport(FATAL,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid list syntax for \"coordinator_passwords\"")));
}
}
- else if (i < NumDataNodes)
- {
- /* syntax error in list */
- ereport(FATAL,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid list syntax for \"data_node_passwords\"")));
- }
+ /* End of Parsing for Datanode and Coordinator Data */
PoolerLoop();
return 0;
@@ -549,7 +629,8 @@ agent_create(void)
agent->port.RecvPointer = 0;
agent->port.SendPointer = 0;
agent->pool = NULL;
- agent->connections = NULL;
+ agent->dn_connections = NULL;
+ agent->coord_connections = NULL;
/* Append new agent to the list */
poolAgents[agentCount++] = agent;
@@ -579,22 +660,21 @@ PoolManagerConnect(PoolHandle *handle, const char *database)
* Init PoolAgent
*/
static void
-agent_init(PoolAgent *agent, const char *database, List *nodes)
+agent_init(PoolAgent *agent, const char *database)
{
Assert(agent);
Assert(database);
- Assert(list_length(nodes) > 0);
- /* disconnect if we still connected */
+ /* disconnect if we are still connected */
if (agent->pool)
- agent_release_connections(agent, NULL);
+ agent_release_connections(agent, NULL, NULL);
/* find database */
agent->pool = find_database_pool(database);
/* create if not found */
if (agent->pool == NULL)
- agent->pool = create_database_pool(database, nodes);
+ agent->pool = create_database_pool(database);
}
@@ -612,7 +692,7 @@ agent_destroy(PoolAgent *agent)
/* Discard connections if any remaining */
if (agent->pool)
- agent_release_connections(agent, NULL);
+ agent_release_connections(agent, NULL, NULL);
/* find agent in the list */
for (i = 0; i < agentCount; i++)
@@ -620,10 +700,15 @@ agent_destroy(PoolAgent *agent)
if (poolAgents[i] == agent)
{
/* free memory */
- if (agent->connections)
+ if (agent->dn_connections)
+ {
+ pfree(agent->dn_connections);
+ agent->dn_connections = NULL;
+ }
+ if (agent->coord_connections)
{
- pfree(agent->connections);
- agent->connections = NULL;
+ pfree(agent->coord_connections);
+ agent->coord_connections = NULL;
}
pfree(agent);
/* shrink the list and move last agent into the freed slot */
@@ -657,34 +742,52 @@ PoolManagerDisconnect(PoolHandle *handle)
* Get pooled connections
*/
int *
-PoolManagerGetConnections(List *nodelist)
+PoolManagerGetConnections(List *datanodelist, List *coordlist)
{
int i;
ListCell *nodelist_item;
int *fds;
- int nodes[list_length(nodelist) + 1];
+ int totlen = list_length(datanodelist) + list_length(coordlist);
+ int nodes[totlen + 2];
Assert(Handle);
- Assert(list_length(nodelist) > 0);
- /* Prepare end send message to pool manager */
- nodes[0] = htonl(list_length(nodelist));
+ /*
+ * Prepare end send message to pool manager.
+ * First with Datanode list.
+ * This list can be NULL for a query that does not need
+ * Datanode Connections (Sequence DDLs)
+ */
+ nodes[0] = htonl(list_length(datanodelist));
i = 1;
- foreach(nodelist_item, nodelist)
+ if (list_length(datanodelist) != 0)
{
- nodes[i++] = htonl(lfirst_int(nodelist_item));
+ foreach(nodelist_item, datanodelist)
+ {
+ nodes[i++] = htonl(lfirst_int(nodelist_item));
+ }
}
- pool_putmessage(&Handle->port, 'g', (char *) nodes, sizeof(int) * (list_length(nodelist) + 1));
+ /* Then with Coordinator list (can be nul) */
+ nodes[i++] = htonl(list_length(coordlist));
+ if (list_length(coordlist) != 0)
+ {
+ foreach(nodelist_item, coordlist)
+ {
+ nodes[i++] = htonl(lfirst_int(nodelist_item));
+ }
+ }
+
+ pool_putmessage(&Handle->port, 'g', (char *) nodes, sizeof(int) * (totlen + 2));
pool_flush(&Handle->port);
/* Receive response */
- fds = (int *) palloc(sizeof(int) * list_length(nodelist));
+ fds = (int *) palloc(sizeof(int) * totlen);
if (fds == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
- if (pool_recvfds(&Handle->port, fds, list_length(nodelist)))
+ if (pool_recvfds(&Handle->port, fds, totlen))
{
pfree(fds);
return NULL;
@@ -708,8 +811,10 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
for (;;)
{
const char *database;
- int nodecount;
- List *nodelist = NIL;
+ int datanodecount;
+ int coordcount;
+ List *datanodelist = NIL;
+ List *coordlist = NIL;
int *fds;
int i;
@@ -718,7 +823,11 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
case 'c': /* CONNECT */
pool_getmessage(&agent->port, s, 0);
database = pq_getmsgstring(s);
- agent_init(agent, database, GetAllNodes());
+ /*
+ * Coordinator pool is not initialized.
+ * With that it would be impossible to create a Database by default.
+ */
+ agent_init(agent, database);
pq_getmsgend(s);
break;
case 'd': /* DISCONNECT */
@@ -727,29 +836,52 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
pq_getmsgend(s);
break;
case 'g': /* GET CONNECTIONS */
- pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8);
- nodecount = pq_getmsgint(s, 4);
- for (i = 0; i < nodecount; i++)
- nodelist = lappend_int(nodelist, pq_getmsgint(s, 4));
+ /*
+ * Length of message is caused by:
+ * - Message header = 4bytes
+ * - List of datanodes = NumDataNodes * 4bytes (max)
+ * - List of coordinators = NumCoords * 4bytes (max)
+ * - Number of Datanodes sent = 4bytes
+ * - Number of Coordinators sent = 4bytes
+ * It is better to send in a same message the list of Co and Dn at the same
+ * time, this permits to reduce interactions between postmaster and pooler
+ */
+ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12);
+ datanodecount = pq_getmsgint(s, 4);
+ for (i = 0; i < datanodecount; i++)
+ datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4));
+ coordcount = pq_getmsgint(s, 4);
+ /* It is possible that no Coordinators are involved in the transaction */
+ if (coordcount != 0)
+ for (i = 0; i < coordcount; i++)
+ coordlist = lappend_int(coordlist, pq_getmsgint(s, 4));
pq_getmsgend(s);
/*
* In case of error agent_acquire_connections will log
* the error and return NULL
*/
- fds = agent_acquire_connections(agent, nodelist);
- list_free(nodelist);
- pool_sendfds(&agent->port, fds, fds ? nodecount : 0);
+ fds = agent_acquire_connections(agent, datanodelist, coordlist);
+ list_free(datanodelist);
+ list_free(coordlist);
+
+ pool_sendfds(&agent->port, fds, fds ? datanodecount + coordcount : 0);
if (fds)
pfree(fds);
break;
case 'r': /* RELEASE CONNECTIONS */
- pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8);
- nodecount = pq_getmsgint(s, 4);
- for (i = 0; i < nodecount; i++)
- nodelist = lappend_int(nodelist, pq_getmsgint(s, 4));
+ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12);
+ datanodecount = pq_getmsgint(s, 4);
+ for (i = 0; i < datanodecount; i++)
+ datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4));
+ coordcount = pq_getmsgint(s, 4);
+ /* It is possible that no Coordinators are involved in the transaction */
+ if (coordcount != 0)
+ for (i = 0; i < coordcount; i++)
+ coordlist = lappend_int(coordlist, pq_getmsgint(s, 4));
pq_getmsgend(s);
- agent_release_connections(agent, nodelist);
- list_free(nodelist);
+ agent_release_connections(agent, datanodelist, coordlist);
+ list_free(datanodelist);
+ list_free(coordlist);
break;
default: /* EOF or protocol violation */
agent_destroy(agent);
@@ -766,17 +898,23 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
* acquire connection
*/
static int *
-agent_acquire_connections(PoolAgent *agent, List *nodelist)
+agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
{
int i;
int *result;
ListCell *nodelist_item;
Assert(agent);
- Assert(nodelist);
- /* Allocate memory */
- result = (int *) palloc(list_length(nodelist) * sizeof(int));
+ /*
+ * Allocate memory
+ * File descriptors of Datanodes and Coordinators are saved in the same array,
+ * This array will be sent back to the postmaster.
+ * It has a length equal to the length of the datanode list
+ * plus the length of the coordinator list.
+ * Datanode fds are saved first, then Coordinator fds are saved.
+ */
+ result = (int *) palloc((list_length(datanodelist) + list_length(coordlist)) * sizeof(int));
if (result == NULL)
{
ereport(ERROR,
@@ -784,11 +922,15 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist)
errmsg("out of memory")));
}
- /* initialize connection if it is not initialized yet */
- if (!agent->connections)
+ /*
+ * Initialize connection if it is not initialized yet
+ * First for the Datanodes
+ */
+ if (!agent->dn_connections)
{
- agent->connections = (DataNodePoolSlot **) palloc(NumDataNodes * sizeof(DataNodePoolSlot *));
- if (!agent->connections)
+ agent->dn_connections = (PGXCNodePoolSlot **)
+ palloc(NumDataNodes * sizeof(PGXCNodePoolSlot *));
+ if (!agent->dn_connections)
{
pfree(result);
ereport(ERROR,
@@ -798,19 +940,63 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist)
}
for (i = 0; i < NumDataNodes; i++)
- agent->connections[i] = NULL;
+ agent->dn_connections[i] = NULL;
}
+ /* Then for the Coordinators */
+ if (!agent->coord_connections)
+ {
+ agent->coord_connections = (PGXCNodePoolSlot **)
+ palloc(NumCoords * sizeof(PGXCNodePoolSlot *));
+ if (!agent->coord_connections)
+ {
+ pfree(result);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ return NULL;
+ }
+
+ for (i = 0; i < NumCoords; i++)
+ agent->coord_connections[i] = NULL;
+ }
+
+
/* Initialize result */
i = 0;
- foreach(nodelist_item, nodelist)
+ /* Save in array fds of Datanodes first */
+ foreach(nodelist_item, datanodelist)
+ {
+ int node = lfirst_int(nodelist_item);
+
+ /* Acquire from the pool if none */
+ if (agent->dn_connections[node - 1] == NULL)
+ {
+ PGXCNodePoolSlot *slot = acquire_connection(agent->pool, node, REMOTE_CONN_DATANODE);
+
+ /* Handle failure */
+ if (slot == NULL)
+ {
+ pfree(result);
+ return NULL;
+ }
+
+ /* Store in the descriptor */
+ agent->dn_connections[node - 1] = slot;
+ }
+
+ result[i++] = PQsocket((PGconn *) agent->dn_connections[node - 1]->conn);
+ }
+
+ /* Save then in the array fds for Coordinators */
+ foreach(nodelist_item, coordlist)
{
int node = lfirst_int(nodelist_item);
/* Acquire from the pool if none */
- if (agent->connections[node - 1] == NULL)
+ if (agent->coord_connections[node - 1] == NULL)
{
- DataNodePoolSlot *slot = acquire_connection(agent->pool, node);
+ PGXCNodePoolSlot *slot = acquire_connection(agent->pool, node, REMOTE_CONN_COORD);
/* Handle failure */
if (slot == NULL)
@@ -820,10 +1006,10 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist)
}
/* Store in the descriptor */
- agent->connections[node - 1] = slot;
+ agent->coord_connections[node - 1] = slot;
}
- result[i++] = PQsocket((PGconn *) agent->connections[node - 1]->conn);
+ result[i++] = PQsocket((PGconn *) agent->coord_connections[node - 1]->conn);
}
return result;
@@ -831,87 +1017,137 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist)
/*
- * Retun connections back to the pool
+ * Return connections back to the pool
*/
void
-PoolManagerReleaseConnections(int ndisc, int* discard)
+PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard)
{
uint32 n32;
- uint32 buf[1 + ndisc];
+ /*
+ * Buffer contains the list of both Coordinator and Datanodes, as well
+ * as the number of connections
+ */
+ uint32 buf[2 + dn_ndisc + co_ndisc];
int i;
Assert(Handle);
- n32 = htonl((uint32) ndisc);
+ /* Insert the list of Datanodes in buffer */
+ n32 = htonl((uint32) dn_ndisc);
buf[0] = n32;
- for (i = 0; i < ndisc;)
+ for (i = 0; i < dn_ndisc;)
{
- n32 = htonl((uint32) discard[i++]);
+ n32 = htonl((uint32) dn_discard[i++]);
buf[i] = n32;
}
+
+ /* Insert the list of Coordinators in buffer */
+ n32 = htonl((uint32) co_ndisc);
+ buf[dn_ndisc + 1] = n32;
+
+ /* Not necessary to send to pooler a request if there is no Coordinator */
+ if (co_ndisc != 0)
+ {
+ for (i = dn_ndisc + 1; i < (dn_ndisc + co_ndisc + 1);)
+ {
+ n32 = htonl((uint32) co_discard[i - (dn_ndisc + 1)]);
+ buf[++i] = n32;
+ }
+ }
pool_putmessage(&Handle->port, 'r', (char *) buf,
- (1 + ndisc) * sizeof(uint32));
+ (2 + dn_ndisc + co_ndisc) * sizeof(uint32));
pool_flush(&Handle->port);
}
/*
- * Release connections
+ * Release connections for Datanodes and Coordinators
*/
static void
-agent_release_connections(PoolAgent *agent, List *discard)
+agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard)
{
int i;
- DataNodePoolSlot *slot;
-
+ PGXCNodePoolSlot *slot;
- if (!agent->connections)
+ if (!agent->dn_connections && !agent->coord_connections)
return;
- if (discard)
+ /* Discard first for Datanodes */
+ if (dn_discard)
{
ListCell *lc;
- foreach(lc, discard)
+ foreach(lc, dn_discard)
{
int node = lfirst_int(lc);
Assert(node > 0 && node <= NumDataNodes);
- slot = agent->connections[node - 1];
+ slot = agent->dn_connections[node - 1];
/* Discard connection */
if (slot)
- release_connection(agent->pool, slot, node - 1, false);
- agent->connections[node - 1] = NULL;
+ release_connection(agent->pool, slot, node - 1, false, REMOTE_CONN_DATANODE);
+ agent->dn_connections[node - 1] = NULL;
}
}
- /* Remaining connections are assumed to be clean */
+ /* Then discard for Coordinators */
+ if (co_discard)
+ {
+ ListCell *lc;
+
+ foreach(lc, co_discard)
+ {
+ int node = lfirst_int(lc);
+ Assert(node > 0 && node <= NumCoords);
+ slot = agent->coord_connections[node - 1];
+
+ /* Discard connection */
+ if (slot)
+ release_connection(agent->pool, slot, node - 1, false, REMOTE_CONN_COORD);
+ agent->coord_connections[node - 1] = NULL;
+ }
+ }
+
+ /*
+ * Remaining connections are assumed to be clean.
+ * First clean up for Datanodes
+ */
for (i = 0; i < NumDataNodes; i++)
{
- slot = agent->connections[i];
+ slot = agent->dn_connections[i];
/* Release connection */
if (slot)
- release_connection(agent->pool, slot, i, true);
- agent->connections[i] = NULL;
+ release_connection(agent->pool, slot, i, true, REMOTE_CONN_DATANODE);
+ agent->dn_connections[i] = NULL;
+ }
+ /* Then clean up for Coordinator connections */
+ for (i = 0; i < NumCoords; i++)
+ {
+ slot = agent->coord_connections[i];
+
+ /* Release connection */
+ if (slot)
+ release_connection(agent->pool, slot, i, true, REMOTE_CONN_COORD);
+ agent->coord_connections[i] = NULL;
}
}
/*
- * Create new empty pool for a database and insert into the list
+ * Create new empty pool for a database.
+ * By default Database Pools have a size null so as to avoid interactions
+ * between PGXC nodes in the cluster (Co/Co, Dn/Dn and Co/Dn).
+ * Pool is increased at the first GET_CONNECTION message received.
* Returns POOL_OK if operation succeed POOL_FAIL in case of OutOfMemory
- * error and POOL_WEXIST if poll for this database already exist
+ * error and POOL_WEXIST if poll for this database already exist.
*/
static DatabasePool *
-create_database_pool(const char *database, List *nodes)
+create_database_pool(const char *database)
{
DatabasePool *databasePool;
int i;
- ListCell *l;
-
- Assert(nodes && list_length(nodes) > 0);
/* check if exist */
databasePool = find_database_pool(database);
@@ -947,9 +1183,10 @@ create_database_pool(const char *database, List *nodes)
/* Init next reference */
databasePool->next = NULL;
- /* Init data node pools */
- databasePool->nodePools = (DataNodePool **) palloc(NumDataNodes * sizeof(DataNodePool **));
- if (!databasePool->nodePools)
+ /* Init Datanode pools */
+ databasePool->dataNodePools = (PGXCNodePool **)
+ palloc(NumDataNodes * sizeof(PGXCNodePool **));
+ if (!databasePool->dataNodePools)
{
/* out of memory */
ereport(ERROR,
@@ -959,16 +1196,27 @@ create_database_pool(const char *database, List *nodes)
pfree(databasePool);
return NULL;
}
+
for (i = 0; i < NumDataNodes; i++)
- databasePool->nodePools[i] = NULL;
+ databasePool->dataNodePools[i] = NULL;
- foreach(l, nodes)
+ /* Init Coordinator pools */
+ databasePool->coordNodePools = (PGXCNodePool **)
+ palloc(NumCoords * sizeof(PGXCNodePool **));
+ if (!databasePool->coordNodePools)
{
- int nodeid = lfirst_int(l);
-
- grow_pool(databasePool, nodeid - 1);
+ /* out of memory */
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ pfree(databasePool->database);
+ pfree(databasePool);
+ return NULL;
}
+ for (i = 0; i < NumCoords; i++)
+ databasePool->coordNodePools[i] = NULL;
+
/* Insert into the list */
insert_database_pool(databasePool);
@@ -989,12 +1237,19 @@ destroy_database_pool(const char *database)
databasePool = remove_database_pool(database);
if (databasePool)
{
- if (databasePool->nodePools)
+ if (databasePool->dataNodePools)
{
for (i = 0; i < NumDataNodes; i++)
- if (databasePool->nodePools[i])
- destroy_node_pool(databasePool->nodePools[i]);
- pfree(databasePool->nodePools);
+ if (databasePool->dataNodePools[i])
+ destroy_node_pool(databasePool->dataNodePools[i]);
+ pfree(databasePool->dataNodePools);
+ }
+ if (databasePool->coordNodePools)
+ {
+ for (i = 0; i < NumCoords; i++)
+ if (databasePool->coordNodePools[i])
+ destroy_node_pool(databasePool->coordNodePools[i]);
+ pfree(databasePool->coordNodePools);
}
/* free allocated memory */
pfree(databasePool->database);
@@ -1089,22 +1344,41 @@ remove_database_pool(const char *database)
/*
* Acquire connection
*/
-static DataNodePoolSlot *
-acquire_connection(DatabasePool *dbPool, int node)
+static PGXCNodePoolSlot *
+acquire_connection(DatabasePool *dbPool, int node, char client_conn_type)
{
- DataNodePool *nodePool;
- DataNodePoolSlot *slot;
+ PGXCNodePool *nodePool;
+ PGXCNodePoolSlot *slot;
Assert(dbPool);
- Assert(0 < node && node <= NumDataNodes);
+
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ Assert(0 < node && node <= NumDataNodes);
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ Assert(0 < node && node <= NumCoords);
slot = NULL;
- /* Find referenced node pool */
- nodePool = dbPool->nodePools[node - 1];
+ /* Find referenced node pool depending on type of client connection */
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ nodePool = dbPool->dataNodePools[node - 1];
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ nodePool = dbPool->coordNodePools[node - 1];
+
+ /*
+ * When a Coordinator pool is initialized by a Coordinator Postmaster,
+ * it has a NULL size and is below minimum size that is 1
+ * This is to avoid problems of connections between Coordinators
+ * when creating or dropping Databases.
+ */
if (nodePool == NULL || nodePool->freeSize == 0)
{
- grow_pool(dbPool, node - 1);
- nodePool = dbPool->nodePools[node - 1];
+ grow_pool(dbPool, node - 1, client_conn_type);
+
+ /* Get back the correct slot that has been grown up*/
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ nodePool = dbPool->dataNodePools[node - 1];
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ nodePool = dbPool->coordNodePools[node - 1];
}
/* Check available connections */
@@ -1136,12 +1410,16 @@ acquire_connection(DatabasePool *dbPool, int node)
/* Decrement current max pool size */
(nodePool->size)--;
/* Ensure we are not below minimum size */
- grow_pool(dbPool, node - 1);
+ grow_pool(dbPool, node - 1, client_conn_type);
}
if (slot == NULL)
- elog(WARNING, "can not connect to data node %d", node);
-
+ {
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ elog(WARNING, "can not connect to data node %d", node);
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ elog(WARNING, "can not connect to coordinator %d", node);
+ }
return slot;
}
@@ -1150,16 +1428,25 @@ acquire_connection(DatabasePool *dbPool, int node)
* release connection from specified pool and slot
*/
static void
-release_connection(DatabasePool * dbPool, DataNodePoolSlot * slot, int index, bool clean)
+release_connection(DatabasePool * dbPool, PGXCNodePoolSlot * slot,
+ int index, bool clean, char client_conn_type)
{
- DataNodePool *nodePool;
+ PGXCNodePool *nodePool;
Assert(dbPool);
Assert(slot);
- Assert(0 <= index && index < NumDataNodes);
- /* Find referenced node pool */
- nodePool = dbPool->nodePools[index];
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ Assert(0 <= index && index < NumDataNodes);
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ Assert(0 <= index && index < NumCoords);
+
+ /* Find referenced node pool depending on client connection type */
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ nodePool = dbPool->dataNodePools[index];
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ nodePool = dbPool->coordNodePools[index];
+
if (nodePool == NULL)
{
/* report problem */
@@ -1182,40 +1469,74 @@ release_connection(DatabasePool * dbPool, DataNodePoolSlot * slot, int index, bo
/* Decrement pool size */
(nodePool->size)--;
/* Ensure we are not below minimum size */
- grow_pool(dbPool, index);
+ grow_pool(dbPool, index, client_conn_type);
}
}
/*
- * Increase database pool size
+ * Increase database pool size depending on connection type:
+ * REMOTE_CONN_COORD or REMOTE_CONN_DATANODE
*/
static void
-grow_pool(DatabasePool * dbPool, int index)
+grow_pool(DatabasePool * dbPool, int index, char client_conn_type)
{
- DataNodePool *nodePool;
+ PGXCNodePool *nodePool;
Assert(dbPool);
- Assert(0 <= index && index < NumDataNodes);
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ Assert(0 <= index && index < NumDataNodes);
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ Assert(0 <= index && index < NumCoords);
/* Find referenced node pool */
- nodePool = dbPool->nodePools[index];
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ nodePool = dbPool->dataNodePools[index];
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ nodePool = dbPool->coordNodePools[index];
+
if (!nodePool)
{
+ char *remote_type;
+
/* Allocate new DBNode Pool */
- nodePool = (DataNodePool *) palloc(sizeof(DataNodePool));
+ nodePool = (PGXCNodePool *) palloc(sizeof(PGXCNodePool));
if (!nodePool)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
- /* initialize it */
- nodePool->connstr = DataNodeConnStr(
- connectionInfos[index].host,
- connectionInfos[index].port,
- dbPool->database,
- connectionInfos[index].uname,
- connectionInfos[index].password);
+ /*
+ * Don't forget to define the type of remote connection
+ * Now PGXC just support Co->Co and Co->Dn connections
+ * but Dn->Dn Connections could be used for other purposes.
+ */
+ if (IS_PGXC_COORDINATOR)
+ {
+ remote_type = pstrdup("coordinator");
+
+ }
+ else if (IS_PGXC_DATANODE)
+ {
+ remote_type = pstrdup("datanode");
+ }
+
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ /* initialize it */
+ nodePool->connstr = PGXCNodeConnStr(datanode_connInfos[index].host,
+ datanode_connInfos[index].port,
+ dbPool->database,
+ datanode_connInfos[index].uname,
+ datanode_connInfos[index].password,
+ remote_type);
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ nodePool->connstr = PGXCNodeConnStr(coord_connInfos[index].host,
+ coord_connInfos[index].port,
+ dbPool->database,
+ coord_connInfos[index].uname,
+ coord_connInfos[index].password,
+ remote_type);
+
if (!nodePool->connstr)
{
@@ -1225,7 +1546,7 @@ grow_pool(DatabasePool * dbPool, int index)
errmsg("out of memory")));
}
- nodePool->slot = (DataNodePoolSlot **) palloc(MaxPoolSize * sizeof(DataNodePoolSlot *));
+ nodePool->slot = (PGXCNodePoolSlot **) palloc(MaxPoolSize * sizeof(PGXCNodePoolSlot *));
if (!nodePool->slot)
{
pfree(nodePool);
@@ -1234,20 +1555,23 @@ grow_pool(DatabasePool * dbPool, int index)
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
- memset(nodePool->slot, 0, MaxPoolSize * sizeof(DataNodePoolSlot *));
+ memset(nodePool->slot, 0, MaxPoolSize * sizeof(PGXCNodePoolSlot *));
nodePool->freeSize = 0;
nodePool->size = 0;
/* and insert into the array */
- dbPool->nodePools[index] = nodePool;
+ if (client_conn_type == REMOTE_CONN_DATANODE)
+ dbPool->dataNodePools[index] = nodePool;
+ else if (client_conn_type == REMOTE_CONN_COORD)
+ dbPool->coordNodePools[index] = nodePool;
}
while (nodePool->size < MinPoolSize || (nodePool->freeSize == 0 && nodePool->size < MaxPoolSize))
{
- DataNodePoolSlot *slot;
+ PGXCNodePoolSlot *slot;
/* Allocate new slot */
- slot = (DataNodePoolSlot *) palloc(sizeof(DataNodePoolSlot));
+ slot = (PGXCNodePoolSlot *) palloc(sizeof(PGXCNodePoolSlot));
if (slot == NULL)
{
ereport(ERROR,
@@ -1256,8 +1580,8 @@ grow_pool(DatabasePool * dbPool, int index)
}
/* Establish connection */
- slot->conn = DataNodeConnect(nodePool->connstr);
- if (!DataNodeConnected(slot->conn))
+ slot->conn = PGXCNodeConnect(nodePool->connstr);
+ if (!PGXCNodeConnected(slot->conn))
{
destroy_slot(slot);
ereport(LOG,
@@ -1282,9 +1606,9 @@ grow_pool(DatabasePool * dbPool, int index)
* Destroy pool slot
*/
static void
-destroy_slot(DataNodePoolSlot *slot)
+destroy_slot(PGXCNodePoolSlot *slot)
{
- DataNodeClose(slot->conn);
+ PGXCNodeClose(slot->conn);
pfree(slot);
}
@@ -1293,7 +1617,7 @@ destroy_slot(DataNodePoolSlot *slot)
* Destroy node pool
*/
static void
-destroy_node_pool(DataNodePool *node_pool)
+destroy_node_pool(PGXCNodePool *node_pool)
{
int i;
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 67d20b848b..9d6c32b718 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -453,9 +453,11 @@ static void ShmemBackendArrayAdd(Backend *bn);
static void ShmemBackendArrayRemove(Backend *bn);
#endif /* EXEC_BACKEND */
-#ifdef PGXC /* PGXC_COORD */
+#ifdef PGXC
bool isPGXCCoordinator = false;
bool isPGXCDataNode = false;
+int remoteConnType = REMOTE_CONN_APP;
+
#define StartPoolManager() StartChildProcess(PoolerProcess)
#endif
@@ -3102,6 +3104,7 @@ BackendStartup(Port *port)
bn->child_slot = 0;
#ifdef PGXC /* PGXC_COORD */
+ /* Don't get a Pooler Handle if Postmaster is activated from another Coordinator */
if (IS_PGXC_COORDINATOR)
{
pool_handle = GetPoolManagerHandle();
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 2314d50c85..977a4021df 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -735,8 +735,9 @@ GetSnapshotData(Snapshot snapshot)
if (GetSnapshotDataDataNode(snapshot))
return snapshot;
/* else fallthrough */
- } else if (IS_PGXC_COORDINATOR)
+ } else if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
+ /* Snapshot has ever been received from remote Coordinator */
if (GetSnapshotDataCoordinator(snapshot))
return snapshot;
/* else fallthrough */
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b0a33a6376..c2a473fd11 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -79,7 +79,7 @@
/* PGXC_COORD */
#include "pgxc/execRemote.h"
#include "pgxc/planner.h"
-#include "pgxc/datanode.h"
+#include "pgxc/pgxcnode.h"
#include "commands/copy.h"
/* PGXC_DATANODE */
#include "access/transam.h"
@@ -107,8 +107,6 @@ int max_stack_depth = 100;
/* wait N seconds to allow attach from a debugger */
int PostAuthDelay = 0;
-
-
/* ----------------
* private variables
* ----------------
@@ -650,7 +648,7 @@ pg_analyze_and_rewrite(Node *parsetree, const char *query_string,
querytree_list = pg_rewrite_query(query);
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
ListCell *lc;
@@ -914,10 +912,10 @@ exec_simple_query(const char *query_string)
#ifdef PGXC
/*
- * By default we do not want data nodes to contact GTM directly,
+ * By default we do not want Datanodes or client Coordinators to contact GTM directly,
* it should get this information passed down to it.
*/
- if (IS_PGXC_DATANODE)
+ if (IS_PGXC_DATANODE || IsConnFromCoord())
SetForceXidFromGTM(false);
#endif
@@ -1309,7 +1307,7 @@ exec_parse_message(const char *query_string, /* string to execute */
querytree_list = pg_rewrite_query(query);
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
ListCell *lc;
@@ -2949,9 +2947,12 @@ PostgresMain(int argc, char *argv[], const char *username)
int xmin;
int xmax;
int xcnt;
- int *xip;
+ int *xip;
/* Timestamp info */
TimestampTz timestamp;
+ char *remote_conn_type = NULL;
+
+ remoteConnType = REMOTE_CONN_APP;
#endif
#define PendingConfigOption(name,val) \
@@ -3035,7 +3036,7 @@ PostgresMain(int argc, char *argv[], const char *username)
* the common help() function in main/main.c.
*/
#ifdef PGXC
- while ((flag = getopt(argc, argv, "A:B:Cc:D:d:EeFf:h:ijk:lN:nOo:Pp:r:S:sTt:v:W:Xy:-:")) != -1)
+ while ((flag = getopt(argc, argv, "A:B:Cc:D:d:EeFf:h:ijk:lN:nOo:Pp:r:S:sTt:v:W:Xy:z:-:")) != -1)
#else
while ((flag = getopt(argc, argv, "A:B:c:D:d:EeFf:h:ijk:lN:nOo:Pp:r:S:sTt:v:W:y:-:")) != -1)
#endif
@@ -3535,11 +3536,12 @@ PostgresMain(int argc, char *argv[], const char *username)
PgStartTime = GetCurrentTimestamp();
#ifdef PGXC /* PGXC_COORD */
- if (IS_PGXC_COORDINATOR)
+ /* If this postmaster is launched from another Coord, do not initialize handles. skip it */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
{
InitMultinodeExecutor();
/* If we exit, first try and clean connections and send to pool */
- on_proc_exit (DataNodeCleanAndRelease, 0);
+ on_proc_exit (PGXCNodeCleanAndRelease, 0);
}
if (IS_PGXC_DATANODE)
{
@@ -3705,7 +3707,7 @@ PostgresMain(int argc, char *argv[], const char *username)
* Helps us catch any problems where we did not send down a snapshot
* when it was expected.
*/
- if (IS_PGXC_DATANODE)
+ if (IS_PGXC_DATANODE || IsConnFromCoord())
UnsetGlobalSnapshotData();
#endif
@@ -3981,7 +3983,7 @@ PostgresMain(int argc, char *argv[], const char *username)
* is still sending data.
*/
break;
-#ifdef PGXC /* PGXC_DATANODE */
+#ifdef PGXC
case 'g': /* gxid */
{
/* Set the GXID we were passed down */
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index da524ba162..e70fe3e72b 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -63,7 +63,7 @@
#include "pgxc/planner.h"
static void ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit);
+ bool force_autocommit, RemoteQueryExecType exec_type);
#endif
@@ -261,6 +261,8 @@ ProcessUtility(Node *parsetree,
DestReceiver *dest,
char *completionTag)
{
+ bool operation_local = false;
+
Assert(queryString != NULL); /* required as of 8.4 */
check_xact_readonly(parsetree);
@@ -288,8 +290,12 @@ ProcessUtility(Node *parsetree,
{
ListCell *lc;
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- DataNodeBegin();
+ /*
+ * If a COMMIT PREPARED message is received from another Coordinator,
+ * Don't send it down to Datanodes.
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PGXCNodeBegin();
#endif
BeginTransactionBlock();
foreach(lc, stmt->options)
@@ -320,15 +326,27 @@ ProcessUtility(Node *parsetree,
case TRANS_STMT_PREPARE:
#ifdef PGXC
/*
- * If 2PC if invoked from application, transaction is first prepared on Datanodes.
+ * If 2PC is invoked from application, transaction is first prepared on Datanodes.
* 2PC file is not written for Coordinators to keep the possiblity
- * of a COMMIT PREPARED on a separate Coordinator
+ * of a COMMIT PREPARED on a separate Coordinator.
*/
- if (IS_PGXC_COORDINATOR)
- DataNodePrepare(stmt->gid);
-#endif
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ operation_local = PGXCNodePrepare(stmt->gid);
+
+ /*
+ * On a Postgres-XC Datanode, a prepare command coming from Coordinator
+ * has always to be executed.
+ * On a Coordinator also when a DDL has been involved in the prepared transaction
+ */
+ if (IsConnFromCoord())
+ operation_local = true;
+
+ if (!PrepareTransactionBlock(stmt->gid, operation_local))
+ {
+#else
if (!PrepareTransactionBlock(stmt->gid))
{
+#endif
/* report unsuccessful commit in completionTag */
if (completionTag)
strcpy(completionTag, "ROLLBACK");
@@ -337,18 +355,23 @@ ProcessUtility(Node *parsetree,
case TRANS_STMT_COMMIT_PREPARED:
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- DataNodeCommitPrepared(stmt->gid);
+ /*
+ * If a COMMIT PREPARED message is received from another Coordinator,
+ * Don't send it down to Datanodes.
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ operation_local = PGXCNodeCommitPrepared(stmt->gid);
#endif
PreventTransactionChain(isTopLevel, "COMMIT PREPARED");
-
#ifdef PGXC
- if (IS_PGXC_DATANODE)
+ /*
+ * A local Coordinator always commits if involved in Prepare.
+ * 2PC file is created and flushed if a DDL has been involved in the transaction.
+ * If remote connection is a Coordinator type, the commit prepared has to be done locally
+ * if and only if the Coordinator number was in the node list received from GTM.
+ */
+ if (operation_local || IsConnFromCoord())
{
- /*
- * 2PC file of Coordinator is not flushed to disk when transaction is prepared
- * so just skip this part.
- */
#endif
FinishPreparedTransaction(stmt->gid, true);
#ifdef PGXC
@@ -358,19 +381,22 @@ ProcessUtility(Node *parsetree,
case TRANS_STMT_ROLLBACK_PREPARED:
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- DataNodeRollbackPrepared(stmt->gid);
+ /*
+ * If a ROLLBACK PREPARED message is received from another Coordinator,
+ * Don't send it down to Datanodes.
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ operation_local = PGXCNodeRollbackPrepared(stmt->gid);
#endif
-
PreventTransactionChain(isTopLevel, "ROLLBACK PREPARED");
-
#ifdef PGXC
- if (IS_PGXC_DATANODE)
+ /*
+ * Local coordinator rollbacks if involved in PREPARE
+ * If remote connection is a Coordinator type, the commit prepared has to be done locally also.
+ * This works for both Datanodes and Coordinators.
+ */
+ if (operation_local || IsConnFromCoord())
{
- /*
- * 2PC file of Coordinator is not flushed to disk when transaction is prepared
- * so just skip this part.
- */
#endif
FinishPreparedTransaction(stmt->gid, false);
#ifdef PGXC
@@ -458,7 +484,7 @@ ProcessUtility(Node *parsetree,
case T_CreateSchemaStmt:
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
CreateSchemaCommand((CreateSchemaStmt *) parsetree,
queryString);
@@ -627,9 +653,13 @@ ProcessUtility(Node *parsetree,
* run command on correct nodes
*/
if (IS_PGXC_COORDINATOR)
- /* sequence exists only on coordinator */
- if (stmt->removeType != OBJECT_SEQUENCE)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ {
+ /* Sequence exists only on Coordinators */
+ if (stmt->removeType == OBJECT_SEQUENCE)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+ else
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+ }
#endif
}
break;
@@ -643,7 +673,7 @@ ProcessUtility(Node *parsetree,
* run command on correct nodes
*/
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -680,7 +710,7 @@ ProcessUtility(Node *parsetree,
case T_RenameStmt:
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
ExecRenameStmt((RenameStmt *) parsetree);
break;
@@ -688,7 +718,7 @@ ProcessUtility(Node *parsetree,
case T_AlterObjectSchemaStmt:
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
ExecAlterObjectSchemaStmt((AlterObjectSchemaStmt *) parsetree);
break;
@@ -779,7 +809,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -836,7 +866,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -848,7 +878,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -856,7 +886,7 @@ ProcessUtility(Node *parsetree,
DefineEnum((CreateEnumStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -868,7 +898,7 @@ ProcessUtility(Node *parsetree,
CreateFunction((CreateFunctionStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -876,7 +906,7 @@ ProcessUtility(Node *parsetree,
AlterFunction((AlterFunctionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -913,7 +943,7 @@ ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->concurrent);
+ stmt->concurrent, EXEC_ON_ALL_NODES);
#endif
}
break;
@@ -922,16 +952,24 @@ ProcessUtility(Node *parsetree,
DefineRule((RuleStmt *) parsetree, queryString);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
case T_CreateSeqStmt:
DefineSequence((CreateSeqStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+#endif
break;
case T_AlterSeqStmt:
AlterSequence((AlterSeqStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_COORDS);
+#endif
break;
case T_RemoveFuncStmt:
@@ -957,7 +995,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -966,7 +1004,7 @@ ProcessUtility(Node *parsetree,
createdb((CreatedbStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
#endif
break;
@@ -974,7 +1012,7 @@ ProcessUtility(Node *parsetree,
AlterDatabase((AlterDatabaseStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -982,7 +1020,7 @@ ProcessUtility(Node *parsetree,
AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -995,7 +1033,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1037,7 +1075,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_DATANODES);
#endif
break;
@@ -1045,7 +1083,7 @@ ProcessUtility(Node *parsetree,
cluster((ClusterStmt *) parsetree, isTopLevel);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
#endif
break;
@@ -1056,7 +1094,7 @@ ProcessUtility(Node *parsetree,
* vacuum() pops active snapshot and we can not send it to nodes
*/
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
#endif
vacuum((VacuumStmt *) parsetree, InvalidOid, true, NULL, false,
isTopLevel);
@@ -1093,7 +1131,7 @@ ProcessUtility(Node *parsetree,
CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1124,7 +1162,7 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1132,7 +1170,7 @@ ProcessUtility(Node *parsetree,
CreateProceduralLanguage((CreatePLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1140,7 +1178,7 @@ ProcessUtility(Node *parsetree,
DropProceduralLanguage((DropPLangStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1151,7 +1189,7 @@ ProcessUtility(Node *parsetree,
DefineDomain((CreateDomainStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1160,26 +1198,50 @@ ProcessUtility(Node *parsetree,
*/
case T_CreateRoleStmt:
CreateRole((CreateRoleStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_AlterRoleStmt:
AlterRole((AlterRoleStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_AlterRoleSetStmt:
AlterRoleSet((AlterRoleSetStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_DropRoleStmt:
DropRole((DropRoleStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_DropOwnedStmt:
DropOwnedObjects((DropOwnedStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_ReassignOwnedStmt:
ReassignOwnedObjects((ReassignOwnedStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_LockStmt:
@@ -1190,6 +1252,10 @@ ProcessUtility(Node *parsetree,
*/
RequireTransactionChain(isTopLevel, "LOCK TABLE");
LockTableCommand((LockStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
+#endif
break;
case T_ConstraintsSetStmt:
@@ -1204,7 +1270,7 @@ ProcessUtility(Node *parsetree,
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, true);
+ ExecUtilityStmtOnNodes(queryString, NULL, true, EXEC_ON_DATANODES);
#endif
break;
@@ -1241,7 +1307,7 @@ ProcessUtility(Node *parsetree,
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL,
- stmt->kind == OBJECT_DATABASE);
+ stmt->kind == OBJECT_DATABASE, EXEC_ON_ALL_NODES);
#endif
break;
}
@@ -1251,7 +1317,7 @@ ProcessUtility(Node *parsetree,
CreateConversionCommand((CreateConversionStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1259,7 +1325,7 @@ ProcessUtility(Node *parsetree,
CreateCast((CreateCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1267,7 +1333,7 @@ ProcessUtility(Node *parsetree,
DropCast((DropCastStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1275,7 +1341,7 @@ ProcessUtility(Node *parsetree,
DefineOpClass((CreateOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1283,7 +1349,7 @@ ProcessUtility(Node *parsetree,
DefineOpFamily((CreateOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1291,7 +1357,7 @@ ProcessUtility(Node *parsetree,
AlterOpFamily((AlterOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1299,7 +1365,7 @@ ProcessUtility(Node *parsetree,
RemoveOpClass((RemoveOpClassStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1307,7 +1373,7 @@ ProcessUtility(Node *parsetree,
RemoveOpFamily((RemoveOpFamilyStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1315,7 +1381,7 @@ ProcessUtility(Node *parsetree,
AlterTSDictionary((AlterTSDictionaryStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
@@ -1323,13 +1389,18 @@ ProcessUtility(Node *parsetree,
AlterTSConfiguration((AlterTSConfigurationStmt *) parsetree);
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
- ExecUtilityStmtOnNodes(queryString, NULL, false);
+ ExecUtilityStmtOnNodes(queryString, NULL, false, EXEC_ON_ALL_NODES);
#endif
break;
#ifdef PGXC
case T_RemoteQuery:
Assert(IS_PGXC_COORDINATOR);
- ExecRemoteUtility((RemoteQuery *) parsetree);
+ /*
+ * Do not launch query on Other Datanodes if remote connection is a coordinator one
+ * it will cause a deadlock in the cluster at Datanode levels.
+ */
+ if (!IsConnFromCoord())
+ ExecRemoteUtility((RemoteQuery *) parsetree);
break;
#endif
default:
@@ -1340,18 +1411,28 @@ ProcessUtility(Node *parsetree,
}
#ifdef PGXC
+/*
+ * Execute a Utility statement on nodes, including Coordinators
+ * If the DDL is received from a remote Coordinator,
+ * it is not possible to push down DDL to Datanodes
+ * as it is taken in charge by the remote Coordinator.
+ */
static void
ExecUtilityStmtOnNodes(const char *queryString, ExecNodes *nodes,
- bool force_autocommit)
+ bool force_autocommit, RemoteQueryExecType exec_type)
{
- RemoteQuery *step = makeNode(RemoteQuery);
- step->combine_type = COMBINE_TYPE_SAME;
- step->exec_nodes = nodes;
- step->sql_statement = pstrdup(queryString);
- step->force_autocommit = force_autocommit;
- ExecRemoteUtility(step);
- pfree(step->sql_statement);
- pfree(step);
+ if (!IsConnFromCoord())
+ {
+ RemoteQuery *step = makeNode(RemoteQuery);
+ step->combine_type = COMBINE_TYPE_SAME;
+ step->exec_nodes = nodes;
+ step->sql_statement = pstrdup(queryString);
+ step->force_autocommit = force_autocommit;
+ step->exec_type = exec_type;
+ ExecRemoteUtility(step);
+ pfree(step->sql_statement);
+ pfree(step);
+ }
}
#endif
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0676f0da0d..968126190e 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -30,6 +30,7 @@
#include "access/gin.h"
#ifdef PGXC
#include "access/gtm.h"
+#include "pgxc/pgxc.h"
#endif
#include "access/transam.h"
#include "access/twophase.h"
@@ -340,6 +341,20 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
{NULL, 0, false}
};
+#ifdef PGXC
+/*
+ * Define remote connection types for PGXC
+ */
+static const struct config_enum_entry pgxc_conn_types[] = {
+ {"application", REMOTE_CONN_APP, false},
+ {"coordinator", REMOTE_CONN_COORD, false},
+ {"datanode", REMOTE_CONN_DATANODE, false},
+ {"gtm", REMOTE_CONN_GTM, false},
+ {"gtmproxy", REMOTE_CONN_GTM_PROXY, false},
+ {NULL, 0, false}
+};
+#endif
+
/*
* Options for enum values stored in other modules
*/
@@ -2013,6 +2028,15 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"num_coordinators", PGC_POSTMASTER, COORDINATORS,
+ gettext_noop("Number of Coordinators."),
+ NULL
+ },
+ &NumCoords,
+ 1, 1, 65535, NULL, NULL
+ },
+
+ {
{"min_pool_size", PGC_POSTMASTER, DATA_NODES,
gettext_noop("Initial pool size."),
gettext_noop("If number of active connections decreased below this value, "
@@ -2051,11 +2075,11 @@ static struct config_int ConfigureNamesInt[] =
},
{
- {"gtm_coordinator_id", PGC_POSTMASTER, GTM,
- gettext_noop("The Coordinator Identifier."),
+ {"pgxc_node_id", PGC_POSTMASTER, GTM,
+ gettext_noop("The Coordinator or Datanode Identifier."),
NULL
},
- &GtmCoordinatorId,
+ &PGXCNodeId,
1, 1, INT_MAX, NULL, NULL
},
@@ -2676,6 +2700,47 @@ static struct config_string ConfigureNamesString[] =
&GtmHost,
"localhost", NULL, NULL
},
+
+ {
+ {"coordinator_hosts", PGC_POSTMASTER, COORDINATORS,
+ gettext_noop("Host names or addresses of Coordinators."),
+ gettext_noop("Comma separated list or single value, "
+ "if all Coordinators on the same host")
+ },
+ &CoordinatorHosts,
+ "localhost", NULL, NULL
+ },
+
+ {
+ {"coordinator_ports", PGC_POSTMASTER, COORDINATORS,
+ gettext_noop("Port numbers of Coordinators."),
+ gettext_noop("Comma separated list or single value, "
+ "if all Coordinators listen on the same port")
+ },
+ &CoordinatorPorts,
+ "5432", NULL, NULL
+ },
+
+ {
+ {"coordinator_users", PGC_POSTMASTER, COORDINATORS,
+ gettext_noop("User names or addresses of Coordinators."),
+ gettext_noop("Comma separated list or single value, "
+ "if user names are the same on all Coordinators")
+ },
+ &CoordinatorUsers,
+ "postgres", NULL, NULL
+ },
+
+ {
+ {"coordinator_passwords", PGC_POSTMASTER, COORDINATORS,
+ gettext_noop("Passwords of Coordinators."),
+ gettext_noop("Comma separated list or single value, "
+ "if passwords are the same on all Coordinators")
+ },
+ &CoordinatorPwds,
+ "postgres", NULL, NULL
+ },
+
#endif
#ifdef USE_SSL
{
@@ -2853,6 +2918,16 @@ static struct config_enum ConfigureNamesEnum[] =
XMLOPTION_CONTENT, xmloption_options, NULL, NULL
},
+#ifdef PGXC
+ {
+ {"remotetype", PGC_BACKEND, CONN_AUTH,
+ gettext_noop("Sets the type of Postgres-XC remote connection"),
+ NULL
+ },
+ &remoteConnType,
+ REMOTE_CONN_APP, pgxc_conn_types, NULL, NULL
+ },
+#endif
/* End-of-list marker */
{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 267b78548c..97aad51bdf 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -526,6 +526,20 @@
# num_data_nodes extra values are ignored.
#------------------------------------------------------------------------------
+# COORDINATORS
+#------------------------------------------------------------------------------
+#num_coordinators = 1 # Number of Coordinators
+ # (change require restart)
+#coordinator_hosts = 'localhost' # Host names or addresses of Coordinators
+ # (change require restart)
+#coordinator_ports = '5432' # Port numbers of Coordinators
+ # (change require restart)
+#coordinator_users = 'postgres' # User names of Coordinators
+ # (change requires restart)
+#coordinator_passwords = 'postgres' # Passwords of Coordinators
+ # (change requires restart)
+
+#------------------------------------------------------------------------------
# GTM CONNECTION
#------------------------------------------------------------------------------
@@ -533,7 +547,7 @@
# (change requires restart)
#gtm_port = 6666 # Port of GTM
# (change requires restart)
-#gtm_coordinator_id = 1 # Coordinator identifier
+#pgxc_node_id = 1 # Coordinator or Datanode identifier
# (change requires restart)
##------------------------------------------------------------------------------
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index e60fdd9e02..cb078be067 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2875,13 +2875,13 @@ reversedirection_heap(Tuplesortstate *state)
static unsigned int
getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
{
- DataNodeHandle *conn = state->combiner->connections[tapenum];
+ PGXCNodeHandle *conn = state->combiner->connections[tapenum];
for (;;)
{
switch (handle_response(conn, state->combiner))
{
case RESPONSE_EOF:
- if (data_node_receive(1, &conn, NULL))
+ if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg(conn->error)));
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 3da12714ee..b55dccd03b 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -16,10 +16,6 @@
#include "postgres_fe.h"
-#ifdef PGXC
-#include "pgxc/pgxc.h"
-#endif
-
/*
* pg_dump uses two different mechanisms for identifying database objects:
*
diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c
index ff73b8d34a..cb735c2b4a 100644
--- a/src/gtm/client/fe-protocol.c
+++ b/src/gtm/client/fe-protocol.c
@@ -362,7 +362,7 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
break;
case TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT:
case TXN_PREPARE_RESULT:
- case TXN_BEING_PREPARED_RESULT:
+ case TXN_START_PREPARED_RESULT:
if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid,
sizeof (GlobalTransactionId), conn))
result->gr_status = -1;
@@ -549,17 +549,20 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
result->gr_status = -1;
break;
}
- if ((result->gr_resdata.grd_txn_get_gid_data.datanodes = (PGXC_NodeId *)
- malloc(sizeof(PGXC_NodeId) * result->gr_resdata.grd_txn_get_gid_data.datanodecnt)) == NULL)
+ if (result->gr_resdata.grd_txn_get_gid_data.datanodecnt != 0)
{
- result->gr_status = -1;
- break;
- }
- if (gtmpqGetnchar((char *)result->gr_resdata.grd_txn_get_gid_data.datanodes,
- sizeof(PGXC_NodeId) * result->gr_resdata.grd_txn_get_gid_data.datanodecnt, conn))
- {
- result->gr_status = -1;
- break;
+ if ((result->gr_resdata.grd_txn_get_gid_data.datanodes = (PGXC_NodeId *)
+ malloc(sizeof(PGXC_NodeId) * result->gr_resdata.grd_txn_get_gid_data.datanodecnt)) == NULL)
+ {
+ result->gr_status = -1;
+ break;
+ }
+ if (gtmpqGetnchar((char *)result->gr_resdata.grd_txn_get_gid_data.datanodes,
+ sizeof(PGXC_NodeId) * result->gr_resdata.grd_txn_get_gid_data.datanodecnt, conn))
+ {
+ result->gr_status = -1;
+ break;
+ }
}
if (gtmpqGetInt(&result->gr_resdata.grd_txn_get_gid_data.coordcnt,
sizeof (int32), conn))
diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c
index 54b75fd1ed..984aee139c 100644
--- a/src/gtm/client/gtm_client.c
+++ b/src/gtm/client/gtm_client.c
@@ -264,7 +264,7 @@ send_failed:
}
int
-being_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
+start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
int datanodecnt, PGXC_NodeId datanodes[], int coordcnt,
PGXC_NodeId coordinators[])
{
@@ -273,17 +273,20 @@ being_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_TXN_BEING_PREPARED, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(MSG_TXN_START_PREPARED, sizeof (GTM_MessageType), conn) ||
gtmpqPutc(true, conn) ||
gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn) ||
/* Send also GID for an explicit prepared transaction */
gtmpqPutInt(strlen(gid), sizeof (GTM_GIDLen), conn) ||
gtmpqPutnchar((char *) gid, strlen(gid), conn) ||
gtmpqPutInt(datanodecnt, sizeof (int), conn) ||
- gtmpqPutnchar((char *)datanodes, sizeof (PGXC_NodeId) * datanodecnt, conn) ||
gtmpqPutInt(coordcnt, sizeof (int), conn))
goto send_failed;
+ /* Datanode connections are not always involved in a transaction (SEQUENCE DDL) */
+ if (datanodecnt != 0 && gtmpqPutnchar((char *)datanodes, sizeof (PGXC_NodeId) * datanodecnt, conn))
+ goto send_failed;
+
/* Coordinator connections are not always involved in a transaction */
if (coordcnt != 0 && gtmpqPutnchar((char *)coordinators, sizeof (PGXC_NodeId) * coordcnt, conn))
goto send_failed;
@@ -306,7 +309,7 @@ being_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
if (res->gr_status == 0)
{
- Assert(res->gr_type == TXN_BEING_PREPARED_RESULT);
+ Assert(res->gr_type == TXN_START_PREPARED_RESULT);
Assert(res->gr_resdata.grd_gxid == gxid);
}
@@ -405,10 +408,12 @@ get_gid_data(GTM_Conn *conn,
{
*gxid = res->gr_resdata.grd_txn_get_gid_data.gxid;
*prepared_gxid = res->gr_resdata.grd_txn_get_gid_data.prepared_gxid;
- *datanodes = res->gr_resdata.grd_txn_get_gid_data.datanodes;
- *coordinators = res->gr_resdata.grd_txn_get_gid_data.coordinators;
*datanodecnt = res->gr_resdata.grd_txn_get_gid_data.datanodecnt;
*coordcnt = res->gr_resdata.grd_txn_get_gid_data.coordcnt;
+ if (res->gr_resdata.grd_txn_get_gid_data.datanodecnt != 0)
+ *datanodes = res->gr_resdata.grd_txn_get_gid_data.datanodes;
+ if (res->gr_resdata.grd_txn_get_gid_data.coordcnt != 0)
+ *coordinators = res->gr_resdata.grd_txn_get_gid_data.coordinators;
}
return res->gr_status;
diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c
index 949c123fef..f3881978c8 100644
--- a/src/gtm/main/gtm_txn.c
+++ b/src/gtm/main/gtm_txn.c
@@ -875,7 +875,7 @@ GTM_CommitTransaction(GTM_TransactionHandle txn)
* Prepare a transaction
*/
int
-GTM_BeingPreparedTransaction(GTM_TransactionHandle txn,
+GTM_StartPreparedTransaction(GTM_TransactionHandle txn,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
@@ -896,9 +896,11 @@ GTM_BeingPreparedTransaction(GTM_TransactionHandle txn,
gtm_txninfo->gti_datanodecount = datanodecnt;
gtm_txninfo->gti_coordcount = coordcnt;
- if (gtm_txninfo->gti_datanodes == NULL)
+ /* It is possible that no datanode is involved in a transaction (Sequence DDL) */
+ if (datanodecnt != 0 && gtm_txninfo->gti_datanodes == NULL)
gtm_txninfo->gti_datanodes = (PGXC_NodeId *)MemoryContextAlloc(TopMostMemoryContext, sizeof (PGXC_NodeId) * GTM_MAX_2PC_NODES);
- memcpy(gtm_txninfo->gti_datanodes, datanodes, sizeof (PGXC_NodeId) * datanodecnt);
+ if (datanodecnt != 0)
+ memcpy(gtm_txninfo->gti_datanodes, datanodes, sizeof (PGXC_NodeId) * datanodecnt);
/* It is possible that no coordinator is involved in a transaction */
if (coordcnt != 0 && gtm_txninfo->gti_coordinators == NULL)
@@ -919,7 +921,7 @@ GTM_BeingPreparedTransaction(GTM_TransactionHandle txn,
* Same as GTM_PrepareTransaction but takes GXID as input
*/
int
-GTM_BeingPreparedTransactionGXID(GlobalTransactionId gxid,
+GTM_StartPreparedTransactionGXID(GlobalTransactionId gxid,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
@@ -927,7 +929,7 @@ GTM_BeingPreparedTransactionGXID(GlobalTransactionId gxid,
PGXC_NodeId coordinators[])
{
GTM_TransactionHandle txn = GTM_GXIDToHandle(gxid);
- return GTM_BeingPreparedTransaction(txn, gid, datanodecnt, datanodes, coordcnt, coordinators);
+ return GTM_StartPreparedTransaction(txn, gid, datanodecnt, datanodes, coordcnt, coordinators);
}
int
@@ -952,11 +954,14 @@ GTM_GetGIDData(GTM_TransactionHandle prepared_txn,
*datanodecnt = gtm_txninfo->gti_datanodecount;
*coordcnt = gtm_txninfo->gti_coordcount;
- *datanodes = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * gtm_txninfo->gti_datanodecount);
- memcpy(*datanodes, gtm_txninfo->gti_datanodes,
- sizeof (PGXC_NodeId) * gtm_txninfo->gti_datanodecount);
+ if (gtm_txninfo->gti_datanodecount != 0)
+ {
+ *datanodes = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * gtm_txninfo->gti_datanodecount);
+ memcpy(*datanodes, gtm_txninfo->gti_datanodes,
+ sizeof (PGXC_NodeId) * gtm_txninfo->gti_datanodecount);
+ }
- if (coordcnt != 0)
+ if (gtm_txninfo->gti_coordcount != 0)
{
*coordinators = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * gtm_txninfo->gti_coordcount);
memcpy(*coordinators, gtm_txninfo->gti_coordinators,
@@ -1452,12 +1457,16 @@ ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message)
proxyhdr.ph_conid = myport->conn_id;
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
+
/* Send the two GXIDs */
pq_sendbytes(&buf, (char *)&gxid, sizeof(GlobalTransactionId));
pq_sendbytes(&buf, (char *)&prepared_gxid, sizeof(GlobalTransactionId));
+
/* Then send the data linked to nodes involved in prepare */
pq_sendint(&buf, datanodecnt, 4);
- pq_sendbytes(&buf, (char *)datanodes, sizeof(PGXC_NodeId) * datanodecnt);
+ if (datanodecnt != 0)
+ pq_sendbytes(&buf, (char *)datanodes, sizeof(PGXC_NodeId) * datanodecnt);
+
pq_sendint(&buf, coordcnt, 4);
if (coordcnt != 0)
pq_sendbytes(&buf, (char *)coordinators, sizeof(PGXC_NodeId) * coordcnt);
@@ -1676,10 +1685,10 @@ ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message)
}
/*
- * Process MSG_TXN_BEING_PREPARED message
+ * Process MSG_TXN_START_PREPARED message
*/
void
-ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message)
+ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message)
{
StringInfoData buf;
GTM_TransactionHandle txn;
@@ -1718,14 +1727,20 @@ ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message)
gidlen = pq_getmsgint(message, sizeof (GTM_GIDLen));
gid = (char *)pq_getmsgbytes(message, gidlen);
- /* Get Datanode Data */
+ /* Get Datanode Count Data */
datanodecnt = pq_getmsgint(message, 4);
- datanodes = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * datanodecnt);
- memcpy(datanodes, pq_getmsgbytes(message, sizeof (PGXC_NodeId) * datanodecnt),
- sizeof (PGXC_NodeId) * datanodecnt);
- /* Get Coordinator Data, can be possibly NULL */
+ /* Get Coordinator Count Data */
coordcnt = pq_getmsgint(message, 4);
+
+ /* it is possible that Datanodes are not involved in a PREPARE (Sequence DDL) */
+ if (datanodecnt != 0)
+ {
+ datanodes = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * datanodecnt);
+ memcpy(datanodes, pq_getmsgbytes(message, sizeof (PGXC_NodeId) * datanodecnt),
+ sizeof (PGXC_NodeId) * datanodecnt);
+ }
+
if (coordcnt != 0)
{
coordinators = (PGXC_NodeId *) palloc(sizeof (PGXC_NodeId) * coordcnt);
@@ -1739,7 +1754,7 @@ ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message)
/*
* Prepare the transaction
*/
- if (GTM_BeingPreparedTransaction(txn, gid, datanodecnt, datanodes, coordcnt, coordinators) != STATUS_OK)
+ if (GTM_StartPreparedTransaction(txn, gid, datanodecnt, datanodes, coordcnt, coordinators) != STATUS_OK)
ereport(ERROR,
(EINVAL,
errmsg("Failed to prepare the transaction")));
@@ -1752,7 +1767,7 @@ ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message)
pfree(coordinators);
pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_BEING_PREPARED_RESULT, 4);
+ pq_sendint(&buf, TXN_START_PREPARED_RESULT, 4);
if (myport->is_proxy)
{
GTM_ProxyMsgHeader proxyhdr;
diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c
index 1a6e546395..1cba1ea860 100644
--- a/src/gtm/main/main.c
+++ b/src/gtm/main/main.c
@@ -769,7 +769,7 @@ ProcessCommand(Port *myport, StringInfo input_message)
case MSG_TXN_BEGIN_GETGXID:
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
case MSG_TXN_PREPARE:
- case MSG_TXN_BEING_PREPARED:
+ case MSG_TXN_START_PREPARED:
case MSG_TXN_COMMIT:
case MSG_TXN_COMMIT_PREPARED:
case MSG_TXN_ROLLBACK:
@@ -957,8 +957,8 @@ ProcessTransactionCommand(Port *myport, GTM_MessageType mtype, StringInfo messag
ProcessBeginTransactionGetGXIDCommandMulti(myport, message);
break;
- case MSG_TXN_BEING_PREPARED:
- ProcessBeingPreparedTransactionCommand(myport, message);
+ case MSG_TXN_START_PREPARED:
+ ProcessStartPreparedTransactionCommand(myport, message);
break;
case MSG_TXN_PREPARE:
diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c
index d9ca329f14..30f1d1b383 100644
--- a/src/gtm/proxy/proxy_main.c
+++ b/src/gtm/proxy/proxy_main.c
@@ -949,7 +949,7 @@ ProcessCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
case MSG_TXN_BEGIN_GETGXID:
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
case MSG_TXN_PREPARE:
- case MSG_TXN_BEING_PREPARED:
+ case MSG_TXN_START_PREPARED:
case MSG_TXN_COMMIT:
case MSG_TXN_COMMIT_PREPARED:
case MSG_TXN_ROLLBACK:
@@ -1118,7 +1118,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
case MSG_TXN_BEGIN:
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
case MSG_TXN_PREPARE:
- case MSG_TXN_BEING_PREPARED:
+ case MSG_TXN_START_PREPARED:
/* There are not so many 2PC from application messages, so just proxy it. */
case MSG_TXN_COMMIT_PREPARED:
case MSG_TXN_GET_GXID:
@@ -1308,7 +1308,7 @@ ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
case MSG_TXN_PREPARE:
- case MSG_TXN_BEING_PREPARED:
+ case MSG_TXN_START_PREPARED:
case MSG_TXN_GET_GID_DATA:
case MSG_TXN_COMMIT_PREPARED:
GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h
index 6740c865b9..3fc4cfec02 100644
--- a/src/include/access/gtm.h
+++ b/src/include/access/gtm.h
@@ -15,7 +15,7 @@
/* Configuration variables */
extern char *GtmHost;
extern int GtmPort;
-extern int GtmCoordinatorId;
+extern int PGXCNodeId;
extern bool IsGTMConnected(void);
extern void InitGTM(void);
@@ -24,7 +24,7 @@ extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp);
extern GlobalTransactionId BeginTranAutovacuumGTM(void);
extern int CommitTranGTM(GlobalTransactionId gxid);
extern int RollbackTranGTM(GlobalTransactionId gxid);
-extern int BeingPreparedTranGTM(GlobalTransactionId gxid,
+extern int StartPreparedTranGTM(GlobalTransactionId gxid,
char *gid,
int datanodecnt,
PGXC_NodeId datanodes[],
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 90da877545..58bc7e4d43 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,10 +18,6 @@
#include "storage/proc.h"
#include "utils/timestamp.h"
-#ifdef PGXC
-#include "pgxc/pgxc.h"
-#endif
-
/*
* GlobalTransactionData is defined in twophase.c; other places have no
* business knowing the internal definition.
@@ -42,10 +38,12 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid,
#ifdef PGXC
extern void RemoveGXactCoord(GlobalTransaction gxact);
+extern void EndPrepare(GlobalTransaction gxact, bool write_2pc_file);
+#else
+extern void EndPrepare(GlobalTransaction gxact);
#endif
extern void StartPrepare(GlobalTransaction gxact);
-extern void EndPrepare(GlobalTransaction gxact);
extern TransactionId PrescanPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 24d420f6e7..9a51b90d9d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -173,7 +173,11 @@ extern void AbortCurrentTransactionOnce(void);
extern void AbortCurrentTransaction(void);
extern void BeginTransactionBlock(void);
extern bool EndTransactionBlock(void);
+#ifdef PGXC
+extern bool PrepareTransactionBlock(char *gid, bool write_2pc_file);
+#else
extern bool PrepareTransactionBlock(char *gid);
+#endif
extern void UserAbortTransactionBlock(void);
extern void ReleaseSavepoint(List *options);
extern void DefineSavepoint(char *name);
diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h
index 4fe4bcf081..ff1befdced 100644
--- a/src/include/gtm/gtm_client.h
+++ b/src/include/gtm/gtm_client.h
@@ -29,7 +29,7 @@ typedef union GTM_ResultData
} grd_gxid_tp; /* TXN_BEGIN_GETGXID */
GlobalTransactionId grd_gxid; /* TXN_PREPARE
- * TXN_BEING_PREPARED
+ * TXN_START_PREPARED
* TXN_COMMIT
* TXN_COMMIT_PREPARED
* TXN_ROLLBACK
@@ -125,7 +125,7 @@ GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLe
int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid);
int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
-int being_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
+int start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
int datanodecnt, PGXC_NodeId datanodes[],
int coordcnt, PGXC_NodeId coordinators[]);
int prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h
index e1730eb62a..77c97ac1fd 100644
--- a/src/include/gtm/gtm_msg.h
+++ b/src/include/gtm/gtm_msg.h
@@ -22,7 +22,7 @@ typedef enum GTM_MessageType
MSG_TXN_BEGIN, /* Start a new transaction */
MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */
MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */
- MSG_TXN_BEING_PREPARED, /* Begins to prepare a transation for commit */
+ MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */
MSG_TXN_COMMIT, /* Commit a running or prepared transaction */
MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */
MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */
@@ -62,7 +62,7 @@ typedef enum GTM_ResultType
TXN_BEGIN_GETGXID_RESULT,
TXN_BEGIN_GETGXID_MULTI_RESULT,
TXN_PREPARE_RESULT,
- TXN_BEING_PREPARED_RESULT,
+ TXN_START_PREPARED_RESULT,
TXN_COMMIT_PREPARED_RESULT,
TXN_COMMIT_RESULT,
TXN_COMMIT_MULTI_RESULT,
diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h
index 5e3a02cf0f..c883612e64 100644
--- a/src/include/gtm/gtm_txn.h
+++ b/src/include/gtm/gtm_txn.h
@@ -199,13 +199,13 @@ int GTM_CommitTransaction(GTM_TransactionHandle txn);
int GTM_CommitTransactionMulti(GTM_TransactionHandle txn[], int txn_count, int status[]);
int GTM_CommitTransactionGXID(GlobalTransactionId gxid);
int GTM_PrepareTransaction(GTM_TransactionHandle txn);
-int GTM_BeingPreparedTransaction(GTM_TransactionHandle txn,
+int GTM_StartPreparedTransaction(GTM_TransactionHandle txn,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
uint32 coordcnt,
PGXC_NodeId coordinators[]);
-int GTM_BeingPreparedTransactionGXID(GlobalTransactionId gxid,
+int GTM_StartPreparedTransactionGXID(GlobalTransactionId gxid,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
@@ -235,7 +235,7 @@ void ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message);
void ProcessCommitTransactionCommand(Port *myport, StringInfo message);
void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message);
void ProcessRollbackTransactionCommand(Port *myport, StringInfo message);
-void ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message);
+void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message);
void ProcessPrepareTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message);
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 539826d655..ac88d9057c 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -69,7 +69,6 @@ typedef struct
#include "libpq/pqcomm.h"
#include "utils/timestamp.h"
-
typedef enum CAC_state
{
CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY,
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
deleted file mode 100644
index 53faf98c34..0000000000
--- a/src/include/pgxc/datanode.h
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * datanode.h
- *
- * Utility functions to communicate to Data Node
- *
- *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
- * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
- *
- * IDENTIFICATION
- * $$
- *
- *-------------------------------------------------------------------------
- */
-
-#ifndef DATANODE_H
-#define DATANODE_H
-#include "postgres.h"
-#include "gtm/gtm_c.h"
-#include "utils/timestamp.h"
-#include "nodes/pg_list.h"
-#include "utils/snapshot.h"
-#include <unistd.h>
-
-#define NO_SOCKET -1
-
-
-/* Connection to data node maintained by Pool Manager */
-typedef struct PGconn NODE_CONNECTION;
-
-/* Helper structure to access data node from Session */
-typedef enum
-{
- DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
- DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
- DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
- DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
- DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
- DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
- DN_CONNECTION_STATE_COPY_IN,
- DN_CONNECTION_STATE_COPY_OUT
-} DNConnectionState;
-
-#define DN_CONNECTION_STATE_ERROR(dnconn) \
- (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
-
-struct data_node_handle
-{
- int nodenum; /* node identifier 1..NumDataNodes */
- /* fd of the connection */
- int sock;
- /* Connection state */
- char transaction_status;
- DNConnectionState state;
-#ifdef DN_CONNECTION_DEBUG
- bool have_row_desc;
-#endif
- char *error;
- /* Output buffer */
- char *outBuffer;
- size_t outSize;
- size_t outEnd;
- /* Input buffer */
- char *inBuffer;
- size_t inSize;
- size_t inStart;
- size_t inEnd;
- size_t inCursor;
-};
-typedef struct data_node_handle DataNodeHandle;
-
-extern void InitMultinodeExecutor(void);
-
-/* Open/close connection routines (invoked from Pool Manager) */
-extern char *DataNodeConnStr(char *host, char *port, char *dbname, char *user,
- char *password);
-extern NODE_CONNECTION *DataNodeConnect(char *connstr);
-extern void DataNodeClose(NODE_CONNECTION * conn);
-extern int DataNodeConnected(NODE_CONNECTION * conn);
-extern int DataNodeConnClean(NODE_CONNECTION * conn);
-extern void DataNodeCleanAndRelease(int code, Datum arg);
-
-extern DataNodeHandle **get_handles(List *nodelist);
-extern void release_handles(bool force_drop);
-extern int get_transaction_nodes(DataNodeHandle ** connections);
-extern PGXC_NodeId* collect_datanode_numbers(int conn_count, DataNodeHandle ** connections);
-extern int get_active_nodes(DataNodeHandle ** connections);
-
-extern int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
-extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
-
-extern int data_node_send_query(DataNodeHandle * handle, const char *query);
-extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid);
-extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
-extern int data_node_send_timestamp(DataNodeHandle * handle, TimestampTz timestamp);
-
-extern int data_node_receive(const int conn_count,
- DataNodeHandle ** connections, struct timeval * timeout);
-extern int data_node_read_data(DataNodeHandle * conn);
-extern int send_some(DataNodeHandle * handle, int len);
-extern int data_node_flush(DataNodeHandle *handle);
-
-extern char get_message(DataNodeHandle *conn, int *len, char **msg);
-
-extern void add_error_message(DataNodeHandle * handle, const char *message);
-extern void clear_socket_data (DataNodeHandle *conn);
-
-#endif
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 5ba8fff27d..4f4a0c9c53 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -16,7 +16,7 @@
#ifndef EXECREMOTE_H
#define EXECREMOTE_H
-#include "datanode.h"
+#include "pgxcnode.h"
#include "locator.h"
#include "planner.h"
#include "access/tupdesc.h"
@@ -47,7 +47,7 @@ typedef struct RemoteQueryState
{
ScanState ss; /* its first field is NodeTag */
int node_count; /* total count of participating nodes */
- DataNodeHandle **connections; /* data node connections being combined */
+ PGXCNodeHandle **connections; /* data node connections being combined */
int conn_count; /* count of active connections */
int current_conn; /* used to balance load when reading from connections */
CombineType combine_type; /* see CombineType enum */
@@ -77,17 +77,18 @@ typedef struct RemoteQueryState
} RemoteQueryState;
/* Multinode Executor */
-extern void DataNodeBegin(void);
-extern void DataNodeCommit(void);
-extern int DataNodeRollback(void);
-extern int DataNodePrepare(char *gid);
-extern void DataNodeRollbackPrepared(char *gid);
-extern void DataNodeCommitPrepared(char *gid);
+extern void PGXCNodeBegin(void);
+extern void PGXCNodeCommit(void);
+extern int PGXCNodeRollback(void);
+extern bool PGXCNodePrepare(char *gid);
+extern bool PGXCNodeRollbackPrepared(char *gid);
+extern bool PGXCNodeCommitPrepared(char *gid);
-extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
-extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle** copy_connections);
-extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file);
-extern void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+/* Copy command just involves Datanodes */
+extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
+extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections);
+extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file);
+extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
extern int ExecCountSlotsRemoteQuery(RemoteQuery *node);
extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags);
@@ -95,11 +96,11 @@ extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step);
extern void ExecEndRemoteQuery(RemoteQueryState *step);
extern void ExecRemoteUtility(RemoteQuery *node);
-extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
+extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
-extern void DataNodeConsumeMessages(void);
+extern void PGXCNodeConsumeMessages(void);
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 233bf262cd..afeff56057 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -85,7 +85,8 @@ extern bool IsHashColumnForRelId(Oid relid, char *part_col_name);
extern int GetRoundRobinNode(Oid relid);
extern bool IsHashDistributable(Oid col_type);
-extern List *GetAllNodes(void);
+extern List *GetAllDataNodes(void);
+extern List *GetAllCoordNodes(void);
extern List *GetAnyDataNode(void);
extern void RelationBuildLocator(Relation rel);
extern void FreeRelationLocInfo(RelationLocInfo *relationLocInfo);
diff --git a/src/include/pgxc/pgxc.h b/src/include/pgxc/pgxc.h
index 09ff2c0ada..3b0309079c 100644
--- a/src/include/pgxc/pgxc.h
+++ b/src/include/pgxc/pgxc.h
@@ -17,7 +17,25 @@
extern bool isPGXCCoordinator;
extern bool isPGXCDataNode;
+typedef enum
+{
+ REMOTE_CONN_APP,
+ REMOTE_CONN_COORD,
+ REMOTE_CONN_DATANODE,
+ REMOTE_CONN_GTM,
+ REMOTE_CONN_GTM_PROXY
+} RemoteConnTypes;
+
+/* Determine remote connection type for a PGXC backend */
+extern int remoteConnType;
+
#define IS_PGXC_COORDINATOR isPGXCCoordinator
#define IS_PGXC_DATANODE isPGXCDataNode
+#define REMOTE_CONN_TYPE remoteConnType
+#define IsConnFromApp() (remoteConnType == REMOTE_CONN_APP)
+#define IsConnFromCoord() (remoteConnType == REMOTE_CONN_COORD)
+#define IsConnFromDatanode() (remoteConnType == REMOTE_CONN_DATANODE)
+#define IsConnFromGtm() (remoteConnType == REMOTE_CONN_GTM)
+#define IsConnFromGtmProxy() (remoteConnType == REMOTE_CONN_GTM_PROXY)
#endif /* PGXC */
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
new file mode 100644
index 0000000000..d6c5af950c
--- /dev/null
+++ b/src/include/pgxc/pgxcnode.h
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ *
+ * pgxcnode.h
+ *
+ * Utility functions to communicate to Datanodes and Coordinators
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ * IDENTIFICATION
+ * $$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef DATANODE_H
+#define DATANODE_H
+#include "postgres.h"
+#include "gtm/gtm_c.h"
+#include "utils/timestamp.h"
+#include "nodes/pg_list.h"
+#include "utils/snapshot.h"
+#include <unistd.h>
+
+#define NO_SOCKET -1
+
+
+/* Connection to data node maintained by Pool Manager */
+typedef struct PGconn NODE_CONNECTION;
+
+/* Helper structure to access data node from Session */
+typedef enum
+{
+ DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
+ DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
+ DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
+ DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
+ DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
+ DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
+ DN_CONNECTION_STATE_COPY_IN,
+ DN_CONNECTION_STATE_COPY_OUT
+} DNConnectionState;
+
+#define DN_CONNECTION_STATE_ERROR(dnconn) \
+ (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
+ || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
+
+struct pgxc_node_handle
+{
+ int nodenum; /* node identifier 1..NumDataNodes or 1..NumCoords */
+ /* fd of the connection */
+ int sock;
+ /* Connection state */
+ char transaction_status;
+ DNConnectionState state;
+#ifdef DN_CONNECTION_DEBUG
+ bool have_row_desc;
+#endif
+ char *error;
+ /* Output buffer */
+ char *outBuffer;
+ size_t outSize;
+ size_t outEnd;
+ /* Input buffer */
+ char *inBuffer;
+ size_t inSize;
+ size_t inStart;
+ size_t inEnd;
+ size_t inCursor;
+};
+typedef struct pgxc_node_handle PGXCNodeHandle;
+
+/* Structure used to get all the handles involved in a transaction */
+typedef struct
+{
+ PGXCNodeHandle *primary_handle; /* Primary connection to PGXC node */
+ int dn_conn_count; /* number of Datanode Handles including primary handle */
+ PGXCNodeHandle **datanode_handles; /* an array of Datanode handles */
+ int co_conn_count; /* number of Coordinator handles */
+ PGXCNodeHandle **coord_handles; /* an array of Coordinator handles */
+} PGXCNodeAllHandles;
+
+extern void InitMultinodeExecutor(void);
+
+/* Open/close connection routines (invoked from Pool Manager) */
+extern char *PGXCNodeConnStr(char *host, char *port, char *dbname, char *user,
+ char *password, char *remote_type);
+extern NODE_CONNECTION *PGXCNodeConnect(char *connstr);
+extern void PGXCNodeClose(NODE_CONNECTION * conn);
+extern int PGXCNodeConnected(NODE_CONNECTION * conn);
+extern int PGXCNodeConnClean(NODE_CONNECTION * conn);
+extern void PGXCNodeCleanAndRelease(int code, Datum arg);
+
+extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
+extern void release_handles(bool force_drop);
+
+extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type);
+extern PGXC_NodeId* collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle ** connections, char client_conn_type);
+extern int get_active_nodes(PGXCNodeHandle ** connections);
+
+extern int ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
+extern int ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
+
+extern int pgxc_node_send_query(PGXCNodeHandle * handle, const char *query);
+extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid);
+extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot);
+extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp);
+
+extern int pgxc_node_receive(const int conn_count,
+ PGXCNodeHandle ** connections, struct timeval * timeout);
+extern int pgxc_node_read_data(PGXCNodeHandle * conn);
+extern int send_some(PGXCNodeHandle * handle, int len);
+extern int pgxc_node_flush(PGXCNodeHandle *handle);
+
+extern int pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error);
+extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error);
+
+extern char get_message(PGXCNodeHandle *conn, int *len, char **msg);
+
+extern void add_error_message(PGXCNodeHandle * handle, const char *message);
+extern void clear_socket_data (PGXCNodeHandle *conn);
+
+#endif
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index d2bac5a828..1e31fa38c1 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -54,7 +54,21 @@ typedef struct
Oid *eqOperators; /* OIDs of operators to equate them by */
} SimpleDistinct;
-/* Contains instructions on processing a step of a query.
+/*
+ * Determines if query has to be launched
+ * on Coordinators only (SEQUENCE DDL),
+ * on Datanodes (normal Remote Queries),
+ * or on all Postgres-XC nodes (Utilities and DDL).
+ */
+typedef enum
+{
+ EXEC_ON_DATANODES,
+ EXEC_ON_COORDS,
+ EXEC_ON_ALL_NODES
+} RemoteQueryExecType;
+
+/*
+ * Contains instructions on processing a step of a query.
* In the prototype this will be simple, but it will eventually
* evolve into a GridSQL-style QueryStep.
*/
@@ -63,17 +77,19 @@ typedef struct
Scan scan;
bool is_single_step; /* special case, skip extra work */
char *sql_statement;
- ExecNodes *exec_nodes;
+ ExecNodes *exec_nodes; /* List of Datanodes where to launch query */
CombineType combine_type;
List *simple_aggregates; /* simple aggregate to combine on this step */
SimpleSort *sort;
SimpleDistinct *distinct;
bool read_only; /* do not use 2PC when committing read only steps */
- bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ RemoteQueryExecType exec_type;
} RemoteQuery;
-/* For handling simple aggregates (no group by present)
+/*
+ * For handling simple aggregates (no group by present)
* For now, only MAX will be supported.
*/
typedef enum
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index b7ac3aedf0..3a615cc0f3 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -17,7 +17,7 @@
#ifndef POOLMGR_H
#define POOLMGR_H
#include <sys/time.h>
-#include "datanode.h"
+#include "pgxcnode.h"
#include "poolcomm.h"
#include "storage/pmsignal.h"
@@ -30,30 +30,31 @@ typedef struct
char *port;
char *uname;
char *password;
-} DataNodeConnectionInfo;
+} PGXCNodeConnectionInfo;
/* Connection pool entry */
typedef struct
{
struct timeval released;
NODE_CONNECTION *conn;
-} DataNodePoolSlot;
+} PGXCNodePoolSlot;
-/* Pool of connections to specified data nodes */
+/* Pool of connections to specified pgxc node */
typedef struct
{
char *connstr;
int freeSize; /* available connections */
int size; /* total pool size */
- DataNodePoolSlot **slot;
-} DataNodePool;
+ PGXCNodePoolSlot **slot;
+} PGXCNodePool;
/* All pools for specified database */
typedef struct databasepool
{
Oid databaseId;
char *database;
- DataNodePool **nodePools; /* one for each data node */
+ PGXCNodePool **dataNodePools; /* one for each Datanode */
+ PGXCNodePool **coordNodePools; /* one for each Coordinator */
struct databasepool *next;
} DatabasePool;
@@ -65,7 +66,8 @@ typedef struct
/* communication channel */
PoolPort port;
DatabasePool *pool;
- DataNodePoolSlot **connections; /* one for each data node */
+ PGXCNodePoolSlot **dn_connections; /* one for each Datanode */
+ PGXCNodePoolSlot **coord_connections; /* one for each Coordinator */
} PoolAgent;
/* Handle to the pool manager (Session's side) */
@@ -76,6 +78,7 @@ typedef struct
} PoolHandle;
extern int NumDataNodes;
+extern int NumCoords;
extern int MinPoolSize;
extern int MaxPoolSize;
extern int PoolerPort;
@@ -87,6 +90,11 @@ extern char *DataNodePorts;
extern char *DataNodeUsers;
extern char *DataNodePwds;
+extern char *CoordinatorHosts;
+extern char *CoordinatorPorts;
+extern char *CoordinatorUsers;
+extern char *CoordinatorPwds;
+
/* Initialize internal structures */
extern int PoolManagerInit(void);
@@ -122,9 +130,9 @@ extern void PoolManagerDisconnect(PoolHandle *handle);
extern void PoolManagerConnect(PoolHandle *handle, const char *database);
/* Get pooled connections */
-extern int *PoolManagerGetConnections(List *nodelist);
+extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist);
-/* Retun connections back to the pool */
-extern void PoolManagerReleaseConnections(int ndisc, int* discard);
+/* Return connections back to the pool, for both Coordinator and Datanode connections */
+extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard);
#endif
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 9c87386288..b69dd3d6b3 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -78,7 +78,8 @@ enum config_group
CUSTOM_OPTIONS,
DEVELOPER_OPTIONS,
DATA_NODES,
- GTM
+ GTM,
+ COORDINATORS
};
/*
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 7c8e744915..31d110ad27 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -23,10 +23,6 @@
#include "utils/int8.h"
#endif
-#ifdef PGXC
-#include "pgxc/pgxc.h"
-#endif
-
/*
* Timestamp represents absolute time.
*