summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorMichael P2010-10-13 01:45:16 +0000
committerPavan Deolasee2011-05-19 16:45:18 +0000
commit1a9cb6f13394e9f5e8648f1cb24fac211956b3ce (patch)
tree7dc7d3f81b75618160bc6888d4854b404c563c06 /src/include
parent9b94deb70ede6c7860b471e223e125b873289922 (diff)
Added support for two new pieces of functionality.
1) Support for DDL and utility command synchronisation among Coordinators. DDL is now synchronized amongst multiple coordinators. Previously, after DDL it was required to use an extra utility to resync the nodes and restart other Coordinators. This is no longer necessary. DDL support works also with common BEGIN, COMMIT and ROLLBACK instructions in the cluster. DDL may be initiated at any node. Each Coordinator can connect to any other one. Just as Coordinators use pools for connecting to Data Nodes, Coordinators now use pools for connecting to the other Coordinators. 2) Support for PREPARE TRANSACTION and COMMIT TRANSACTION, ROLLBACK PREPARED. When a transaction is prepared or committed, based on the SQL, it will only execute on the involved nodes, including DDL on Coordinators. GTM is used track which xid and nodes are involved in the transaction, identified by the user or application specified transaction identifier, when it is prepared. New GUCs -------- There are some new GUCs for handling Coordinator communication num_coordinators coordinator_hosts coordinator_ports coordinator_users coordinator_passwords In addition, a new GUC replaces coordinator_id: pgxc_node_id Open Issues ----------- Implicit two phase commit (client in autocommit mode, but distributed transaction required because of multiple nodes) does not first prepare on the originating coordinator before committing, if DDL is involved. We really should prepare here before committing on all nodes. We also need to add a bit of special handling for COMMIT PREPARED. If there is an error, and it got committed on some nodes, we still should force it to be committed on the originating coordinator, if involved, and still return an error of some sort that it was partially committed. (When the downed node recovers, in the future it will determine if any other node has committed the transaction, and if so, it, too, must commit.) It is a pretty rare case, but we should handle it. With this current configuration, DDL will fail if at least one Coordinator is down. In the future, we will make this more flexible. Written by Michael Paquier
Diffstat (limited to 'src/include')
-rw-r--r--src/include/access/gtm.h4
-rw-r--r--src/include/access/twophase.h8
-rw-r--r--src/include/access/xact.h4
-rw-r--r--src/include/gtm/gtm_client.h4
-rw-r--r--src/include/gtm/gtm_msg.h4
-rw-r--r--src/include/gtm/gtm_txn.h6
-rw-r--r--src/include/libpq/libpq-be.h1
-rw-r--r--src/include/pgxc/datanode.h110
-rw-r--r--src/include/pgxc/execRemote.h29
-rw-r--r--src/include/pgxc/locator.h3
-rw-r--r--src/include/pgxc/pgxc.h18
-rw-r--r--src/include/pgxc/pgxcnode.h124
-rw-r--r--src/include/pgxc/planner.h24
-rw-r--r--src/include/pgxc/poolmgr.h30
-rw-r--r--src/include/utils/guc_tables.h3
-rw-r--r--src/include/utils/timestamp.h4
16 files changed, 216 insertions, 160 deletions
diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h
index 6740c865b9..3fc4cfec02 100644
--- a/src/include/access/gtm.h
+++ b/src/include/access/gtm.h
@@ -15,7 +15,7 @@
/* Configuration variables */
extern char *GtmHost;
extern int GtmPort;
-extern int GtmCoordinatorId;
+extern int PGXCNodeId;
extern bool IsGTMConnected(void);
extern void InitGTM(void);
@@ -24,7 +24,7 @@ extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp);
extern GlobalTransactionId BeginTranAutovacuumGTM(void);
extern int CommitTranGTM(GlobalTransactionId gxid);
extern int RollbackTranGTM(GlobalTransactionId gxid);
-extern int BeingPreparedTranGTM(GlobalTransactionId gxid,
+extern int StartPreparedTranGTM(GlobalTransactionId gxid,
char *gid,
int datanodecnt,
PGXC_NodeId datanodes[],
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 90da877545..58bc7e4d43 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,10 +18,6 @@
#include "storage/proc.h"
#include "utils/timestamp.h"
-#ifdef PGXC
-#include "pgxc/pgxc.h"
-#endif
-
/*
* GlobalTransactionData is defined in twophase.c; other places have no
* business knowing the internal definition.
@@ -42,10 +38,12 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid,
#ifdef PGXC
extern void RemoveGXactCoord(GlobalTransaction gxact);
+extern void EndPrepare(GlobalTransaction gxact, bool write_2pc_file);
+#else
+extern void EndPrepare(GlobalTransaction gxact);
#endif
extern void StartPrepare(GlobalTransaction gxact);
-extern void EndPrepare(GlobalTransaction gxact);
extern TransactionId PrescanPreparedTransactions(void);
extern void RecoverPreparedTransactions(void);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 24d420f6e7..9a51b90d9d 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -173,7 +173,11 @@ extern void AbortCurrentTransactionOnce(void);
extern void AbortCurrentTransaction(void);
extern void BeginTransactionBlock(void);
extern bool EndTransactionBlock(void);
+#ifdef PGXC
+extern bool PrepareTransactionBlock(char *gid, bool write_2pc_file);
+#else
extern bool PrepareTransactionBlock(char *gid);
+#endif
extern void UserAbortTransactionBlock(void);
extern void ReleaseSavepoint(List *options);
extern void DefineSavepoint(char *name);
diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h
index 4fe4bcf081..ff1befdced 100644
--- a/src/include/gtm/gtm_client.h
+++ b/src/include/gtm/gtm_client.h
@@ -29,7 +29,7 @@ typedef union GTM_ResultData
} grd_gxid_tp; /* TXN_BEGIN_GETGXID */
GlobalTransactionId grd_gxid; /* TXN_PREPARE
- * TXN_BEING_PREPARED
+ * TXN_START_PREPARED
* TXN_COMMIT
* TXN_COMMIT_PREPARED
* TXN_ROLLBACK
@@ -125,7 +125,7 @@ GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLe
int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid);
int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
-int being_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
+int start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
int datanodecnt, PGXC_NodeId datanodes[],
int coordcnt, PGXC_NodeId coordinators[]);
int prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h
index e1730eb62a..77c97ac1fd 100644
--- a/src/include/gtm/gtm_msg.h
+++ b/src/include/gtm/gtm_msg.h
@@ -22,7 +22,7 @@ typedef enum GTM_MessageType
MSG_TXN_BEGIN, /* Start a new transaction */
MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */
MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */
- MSG_TXN_BEING_PREPARED, /* Begins to prepare a transation for commit */
+ MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */
MSG_TXN_COMMIT, /* Commit a running or prepared transaction */
MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */
MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */
@@ -62,7 +62,7 @@ typedef enum GTM_ResultType
TXN_BEGIN_GETGXID_RESULT,
TXN_BEGIN_GETGXID_MULTI_RESULT,
TXN_PREPARE_RESULT,
- TXN_BEING_PREPARED_RESULT,
+ TXN_START_PREPARED_RESULT,
TXN_COMMIT_PREPARED_RESULT,
TXN_COMMIT_RESULT,
TXN_COMMIT_MULTI_RESULT,
diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h
index 5e3a02cf0f..c883612e64 100644
--- a/src/include/gtm/gtm_txn.h
+++ b/src/include/gtm/gtm_txn.h
@@ -199,13 +199,13 @@ int GTM_CommitTransaction(GTM_TransactionHandle txn);
int GTM_CommitTransactionMulti(GTM_TransactionHandle txn[], int txn_count, int status[]);
int GTM_CommitTransactionGXID(GlobalTransactionId gxid);
int GTM_PrepareTransaction(GTM_TransactionHandle txn);
-int GTM_BeingPreparedTransaction(GTM_TransactionHandle txn,
+int GTM_StartPreparedTransaction(GTM_TransactionHandle txn,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
uint32 coordcnt,
PGXC_NodeId coordinators[]);
-int GTM_BeingPreparedTransactionGXID(GlobalTransactionId gxid,
+int GTM_StartPreparedTransactionGXID(GlobalTransactionId gxid,
char *gid,
uint32 datanodecnt,
PGXC_NodeId datanodes[],
@@ -235,7 +235,7 @@ void ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message);
void ProcessCommitTransactionCommand(Port *myport, StringInfo message);
void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message);
void ProcessRollbackTransactionCommand(Port *myport, StringInfo message);
-void ProcessBeingPreparedTransactionCommand(Port *myport, StringInfo message);
+void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message);
void ProcessPrepareTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message);
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 539826d655..ac88d9057c 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -69,7 +69,6 @@ typedef struct
#include "libpq/pqcomm.h"
#include "utils/timestamp.h"
-
typedef enum CAC_state
{
CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY,
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
deleted file mode 100644
index 53faf98c34..0000000000
--- a/src/include/pgxc/datanode.h
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * datanode.h
- *
- * Utility functions to communicate to Data Node
- *
- *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
- * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
- *
- * IDENTIFICATION
- * $$
- *
- *-------------------------------------------------------------------------
- */
-
-#ifndef DATANODE_H
-#define DATANODE_H
-#include "postgres.h"
-#include "gtm/gtm_c.h"
-#include "utils/timestamp.h"
-#include "nodes/pg_list.h"
-#include "utils/snapshot.h"
-#include <unistd.h>
-
-#define NO_SOCKET -1
-
-
-/* Connection to data node maintained by Pool Manager */
-typedef struct PGconn NODE_CONNECTION;
-
-/* Helper structure to access data node from Session */
-typedef enum
-{
- DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
- DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
- DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
- DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
- DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
- DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
- DN_CONNECTION_STATE_COPY_IN,
- DN_CONNECTION_STATE_COPY_OUT
-} DNConnectionState;
-
-#define DN_CONNECTION_STATE_ERROR(dnconn) \
- (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
- || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
-
-struct data_node_handle
-{
- int nodenum; /* node identifier 1..NumDataNodes */
- /* fd of the connection */
- int sock;
- /* Connection state */
- char transaction_status;
- DNConnectionState state;
-#ifdef DN_CONNECTION_DEBUG
- bool have_row_desc;
-#endif
- char *error;
- /* Output buffer */
- char *outBuffer;
- size_t outSize;
- size_t outEnd;
- /* Input buffer */
- char *inBuffer;
- size_t inSize;
- size_t inStart;
- size_t inEnd;
- size_t inCursor;
-};
-typedef struct data_node_handle DataNodeHandle;
-
-extern void InitMultinodeExecutor(void);
-
-/* Open/close connection routines (invoked from Pool Manager) */
-extern char *DataNodeConnStr(char *host, char *port, char *dbname, char *user,
- char *password);
-extern NODE_CONNECTION *DataNodeConnect(char *connstr);
-extern void DataNodeClose(NODE_CONNECTION * conn);
-extern int DataNodeConnected(NODE_CONNECTION * conn);
-extern int DataNodeConnClean(NODE_CONNECTION * conn);
-extern void DataNodeCleanAndRelease(int code, Datum arg);
-
-extern DataNodeHandle **get_handles(List *nodelist);
-extern void release_handles(bool force_drop);
-extern int get_transaction_nodes(DataNodeHandle ** connections);
-extern PGXC_NodeId* collect_datanode_numbers(int conn_count, DataNodeHandle ** connections);
-extern int get_active_nodes(DataNodeHandle ** connections);
-
-extern int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
-extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
-
-extern int data_node_send_query(DataNodeHandle * handle, const char *query);
-extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid);
-extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
-extern int data_node_send_timestamp(DataNodeHandle * handle, TimestampTz timestamp);
-
-extern int data_node_receive(const int conn_count,
- DataNodeHandle ** connections, struct timeval * timeout);
-extern int data_node_read_data(DataNodeHandle * conn);
-extern int send_some(DataNodeHandle * handle, int len);
-extern int data_node_flush(DataNodeHandle *handle);
-
-extern char get_message(DataNodeHandle *conn, int *len, char **msg);
-
-extern void add_error_message(DataNodeHandle * handle, const char *message);
-extern void clear_socket_data (DataNodeHandle *conn);
-
-#endif
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 5ba8fff27d..4f4a0c9c53 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -16,7 +16,7 @@
#ifndef EXECREMOTE_H
#define EXECREMOTE_H
-#include "datanode.h"
+#include "pgxcnode.h"
#include "locator.h"
#include "planner.h"
#include "access/tupdesc.h"
@@ -47,7 +47,7 @@ typedef struct RemoteQueryState
{
ScanState ss; /* its first field is NodeTag */
int node_count; /* total count of participating nodes */
- DataNodeHandle **connections; /* data node connections being combined */
+ PGXCNodeHandle **connections; /* data node connections being combined */
int conn_count; /* count of active connections */
int current_conn; /* used to balance load when reading from connections */
CombineType combine_type; /* see CombineType enum */
@@ -77,17 +77,18 @@ typedef struct RemoteQueryState
} RemoteQueryState;
/* Multinode Executor */
-extern void DataNodeBegin(void);
-extern void DataNodeCommit(void);
-extern int DataNodeRollback(void);
-extern int DataNodePrepare(char *gid);
-extern void DataNodeRollbackPrepared(char *gid);
-extern void DataNodeCommitPrepared(char *gid);
+extern void PGXCNodeBegin(void);
+extern void PGXCNodeCommit(void);
+extern int PGXCNodeRollback(void);
+extern bool PGXCNodePrepare(char *gid);
+extern bool PGXCNodeRollbackPrepared(char *gid);
+extern bool PGXCNodeCommitPrepared(char *gid);
-extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
-extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, DataNodeHandle** copy_connections);
-extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, DataNodeHandle** copy_connections, FILE* copy_file);
-extern void DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+/* Copy command just involves Datanodes */
+extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
+extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections);
+extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file);
+extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
extern int ExecCountSlotsRemoteQuery(RemoteQuery *node);
extern RemoteQueryState *ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags);
@@ -95,11 +96,11 @@ extern TupleTableSlot* ExecRemoteQuery(RemoteQueryState *step);
extern void ExecEndRemoteQuery(RemoteQueryState *step);
extern void ExecRemoteUtility(RemoteQuery *node);
-extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
+extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
-extern void DataNodeConsumeMessages(void);
+extern void PGXCNodeConsumeMessages(void);
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 233bf262cd..afeff56057 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -85,7 +85,8 @@ extern bool IsHashColumnForRelId(Oid relid, char *part_col_name);
extern int GetRoundRobinNode(Oid relid);
extern bool IsHashDistributable(Oid col_type);
-extern List *GetAllNodes(void);
+extern List *GetAllDataNodes(void);
+extern List *GetAllCoordNodes(void);
extern List *GetAnyDataNode(void);
extern void RelationBuildLocator(Relation rel);
extern void FreeRelationLocInfo(RelationLocInfo *relationLocInfo);
diff --git a/src/include/pgxc/pgxc.h b/src/include/pgxc/pgxc.h
index 09ff2c0ada..3b0309079c 100644
--- a/src/include/pgxc/pgxc.h
+++ b/src/include/pgxc/pgxc.h
@@ -17,7 +17,25 @@
extern bool isPGXCCoordinator;
extern bool isPGXCDataNode;
+typedef enum
+{
+ REMOTE_CONN_APP,
+ REMOTE_CONN_COORD,
+ REMOTE_CONN_DATANODE,
+ REMOTE_CONN_GTM,
+ REMOTE_CONN_GTM_PROXY
+} RemoteConnTypes;
+
+/* Determine remote connection type for a PGXC backend */
+extern int remoteConnType;
+
#define IS_PGXC_COORDINATOR isPGXCCoordinator
#define IS_PGXC_DATANODE isPGXCDataNode
+#define REMOTE_CONN_TYPE remoteConnType
+#define IsConnFromApp() (remoteConnType == REMOTE_CONN_APP)
+#define IsConnFromCoord() (remoteConnType == REMOTE_CONN_COORD)
+#define IsConnFromDatanode() (remoteConnType == REMOTE_CONN_DATANODE)
+#define IsConnFromGtm() (remoteConnType == REMOTE_CONN_GTM)
+#define IsConnFromGtmProxy() (remoteConnType == REMOTE_CONN_GTM_PROXY)
#endif /* PGXC */
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
new file mode 100644
index 0000000000..d6c5af950c
--- /dev/null
+++ b/src/include/pgxc/pgxcnode.h
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ *
+ * pgxcnode.h
+ *
+ * Utility functions to communicate to Datanodes and Coordinators
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group ?
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ * IDENTIFICATION
+ * $$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef DATANODE_H
+#define DATANODE_H
+#include "postgres.h"
+#include "gtm/gtm_c.h"
+#include "utils/timestamp.h"
+#include "nodes/pg_list.h"
+#include "utils/snapshot.h"
+#include <unistd.h>
+
+#define NO_SOCKET -1
+
+
+/* Connection to data node maintained by Pool Manager */
+typedef struct PGconn NODE_CONNECTION;
+
+/* Helper structure to access data node from Session */
+typedef enum
+{
+ DN_CONNECTION_STATE_IDLE, /* idle, ready for query */
+ DN_CONNECTION_STATE_QUERY, /* query is sent, response expected */
+ DN_CONNECTION_STATE_HAS_DATA, /* buffer has data to process */
+ DN_CONNECTION_STATE_COMPLETED, /* query completed, no ReadyForQury yet */
+ DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
+ DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
+ DN_CONNECTION_STATE_COPY_IN,
+ DN_CONNECTION_STATE_COPY_OUT
+} DNConnectionState;
+
+#define DN_CONNECTION_STATE_ERROR(dnconn) \
+ (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
+ || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY
+
+struct pgxc_node_handle
+{
+ int nodenum; /* node identifier 1..NumDataNodes or 1..NumCoords */
+ /* fd of the connection */
+ int sock;
+ /* Connection state */
+ char transaction_status;
+ DNConnectionState state;
+#ifdef DN_CONNECTION_DEBUG
+ bool have_row_desc;
+#endif
+ char *error;
+ /* Output buffer */
+ char *outBuffer;
+ size_t outSize;
+ size_t outEnd;
+ /* Input buffer */
+ char *inBuffer;
+ size_t inSize;
+ size_t inStart;
+ size_t inEnd;
+ size_t inCursor;
+};
+typedef struct pgxc_node_handle PGXCNodeHandle;
+
+/* Structure used to get all the handles involved in a transaction */
+typedef struct
+{
+ PGXCNodeHandle *primary_handle; /* Primary connection to PGXC node */
+ int dn_conn_count; /* number of Datanode Handles including primary handle */
+ PGXCNodeHandle **datanode_handles; /* an array of Datanode handles */
+ int co_conn_count; /* number of Coordinator handles */
+ PGXCNodeHandle **coord_handles; /* an array of Coordinator handles */
+} PGXCNodeAllHandles;
+
+extern void InitMultinodeExecutor(void);
+
+/* Open/close connection routines (invoked from Pool Manager) */
+extern char *PGXCNodeConnStr(char *host, char *port, char *dbname, char *user,
+ char *password, char *remote_type);
+extern NODE_CONNECTION *PGXCNodeConnect(char *connstr);
+extern void PGXCNodeClose(NODE_CONNECTION * conn);
+extern int PGXCNodeConnected(NODE_CONNECTION * conn);
+extern int PGXCNodeConnClean(NODE_CONNECTION * conn);
+extern void PGXCNodeCleanAndRelease(int code, Datum arg);
+
+extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
+extern void release_handles(bool force_drop);
+
+extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type);
+extern PGXC_NodeId* collect_pgxcnode_numbers(int conn_count, PGXCNodeHandle ** connections, char client_conn_type);
+extern int get_active_nodes(PGXCNodeHandle ** connections);
+
+extern int ensure_in_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
+extern int ensure_out_buffer_capacity(size_t bytes_needed, PGXCNodeHandle * handle);
+
+extern int pgxc_node_send_query(PGXCNodeHandle * handle, const char *query);
+extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid);
+extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot);
+extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp);
+
+extern int pgxc_node_receive(const int conn_count,
+ PGXCNodeHandle ** connections, struct timeval * timeout);
+extern int pgxc_node_read_data(PGXCNodeHandle * conn);
+extern int send_some(PGXCNodeHandle * handle, int len);
+extern int pgxc_node_flush(PGXCNodeHandle *handle);
+
+extern int pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error);
+extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error);
+
+extern char get_message(PGXCNodeHandle *conn, int *len, char **msg);
+
+extern void add_error_message(PGXCNodeHandle * handle, const char *message);
+extern void clear_socket_data (PGXCNodeHandle *conn);
+
+#endif
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index d2bac5a828..1e31fa38c1 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -54,7 +54,21 @@ typedef struct
Oid *eqOperators; /* OIDs of operators to equate them by */
} SimpleDistinct;
-/* Contains instructions on processing a step of a query.
+/*
+ * Determines if query has to be launched
+ * on Coordinators only (SEQUENCE DDL),
+ * on Datanodes (normal Remote Queries),
+ * or on all Postgres-XC nodes (Utilities and DDL).
+ */
+typedef enum
+{
+ EXEC_ON_DATANODES,
+ EXEC_ON_COORDS,
+ EXEC_ON_ALL_NODES
+} RemoteQueryExecType;
+
+/*
+ * Contains instructions on processing a step of a query.
* In the prototype this will be simple, but it will eventually
* evolve into a GridSQL-style QueryStep.
*/
@@ -63,17 +77,19 @@ typedef struct
Scan scan;
bool is_single_step; /* special case, skip extra work */
char *sql_statement;
- ExecNodes *exec_nodes;
+ ExecNodes *exec_nodes; /* List of Datanodes where to launch query */
CombineType combine_type;
List *simple_aggregates; /* simple aggregate to combine on this step */
SimpleSort *sort;
SimpleDistinct *distinct;
bool read_only; /* do not use 2PC when committing read only steps */
- bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ RemoteQueryExecType exec_type;
} RemoteQuery;
-/* For handling simple aggregates (no group by present)
+/*
+ * For handling simple aggregates (no group by present)
* For now, only MAX will be supported.
*/
typedef enum
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index b7ac3aedf0..3a615cc0f3 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -17,7 +17,7 @@
#ifndef POOLMGR_H
#define POOLMGR_H
#include <sys/time.h>
-#include "datanode.h"
+#include "pgxcnode.h"
#include "poolcomm.h"
#include "storage/pmsignal.h"
@@ -30,30 +30,31 @@ typedef struct
char *port;
char *uname;
char *password;
-} DataNodeConnectionInfo;
+} PGXCNodeConnectionInfo;
/* Connection pool entry */
typedef struct
{
struct timeval released;
NODE_CONNECTION *conn;
-} DataNodePoolSlot;
+} PGXCNodePoolSlot;
-/* Pool of connections to specified data nodes */
+/* Pool of connections to specified pgxc node */
typedef struct
{
char *connstr;
int freeSize; /* available connections */
int size; /* total pool size */
- DataNodePoolSlot **slot;
-} DataNodePool;
+ PGXCNodePoolSlot **slot;
+} PGXCNodePool;
/* All pools for specified database */
typedef struct databasepool
{
Oid databaseId;
char *database;
- DataNodePool **nodePools; /* one for each data node */
+ PGXCNodePool **dataNodePools; /* one for each Datanode */
+ PGXCNodePool **coordNodePools; /* one for each Coordinator */
struct databasepool *next;
} DatabasePool;
@@ -65,7 +66,8 @@ typedef struct
/* communication channel */
PoolPort port;
DatabasePool *pool;
- DataNodePoolSlot **connections; /* one for each data node */
+ PGXCNodePoolSlot **dn_connections; /* one for each Datanode */
+ PGXCNodePoolSlot **coord_connections; /* one for each Coordinator */
} PoolAgent;
/* Handle to the pool manager (Session's side) */
@@ -76,6 +78,7 @@ typedef struct
} PoolHandle;
extern int NumDataNodes;
+extern int NumCoords;
extern int MinPoolSize;
extern int MaxPoolSize;
extern int PoolerPort;
@@ -87,6 +90,11 @@ extern char *DataNodePorts;
extern char *DataNodeUsers;
extern char *DataNodePwds;
+extern char *CoordinatorHosts;
+extern char *CoordinatorPorts;
+extern char *CoordinatorUsers;
+extern char *CoordinatorPwds;
+
/* Initialize internal structures */
extern int PoolManagerInit(void);
@@ -122,9 +130,9 @@ extern void PoolManagerDisconnect(PoolHandle *handle);
extern void PoolManagerConnect(PoolHandle *handle, const char *database);
/* Get pooled connections */
-extern int *PoolManagerGetConnections(List *nodelist);
+extern int *PoolManagerGetConnections(List *datanodelist, List *coordlist);
-/* Retun connections back to the pool */
-extern void PoolManagerReleaseConnections(int ndisc, int* discard);
+/* Return connections back to the pool, for both Coordinator and Datanode connections */
+extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard);
#endif
diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h
index 9c87386288..b69dd3d6b3 100644
--- a/src/include/utils/guc_tables.h
+++ b/src/include/utils/guc_tables.h
@@ -78,7 +78,8 @@ enum config_group
CUSTOM_OPTIONS,
DEVELOPER_OPTIONS,
DATA_NODES,
- GTM
+ GTM,
+ COORDINATORS
};
/*
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 7c8e744915..31d110ad27 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -23,10 +23,6 @@
#include "utils/int8.h"
#endif
-#ifdef PGXC
-#include "pgxc/pgxc.h"
-#endif
-
/*
* Timestamp represents absolute time.
*