summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/pgbench/pgbench.c6
-rw-r--r--src/backend/access/transam/xact.c32
-rw-r--r--src/backend/pgxc/pool/datanode.c49
-rw-r--r--src/backend/pgxc/pool/execRemote.c449
-rw-r--r--src/backend/pgxc/pool/poolcomm.c15
-rw-r--r--src/backend/pgxc/pool/poolmgr.c71
-rw-r--r--src/backend/tcop/postgres.c4
-rw-r--r--src/backend/utils/sort/tuplesort.c8
-rw-r--r--src/include/pgxc/datanode.h5
-rw-r--r--src/include/pgxc/execRemote.h5
-rw-r--r--src/include/pgxc/poolmgr.h6
11 files changed, 347 insertions, 303 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 321f797c2c..c2aaeec529 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -202,9 +202,9 @@ static char *tpc_b_bid = {
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
- "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n"
- "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid\n"
- "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid AND bid = :bid;\n"
+ "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
+ "SELECT abalance FROM pgbench_accounts WHERE aid = :aid\n"
+ "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
"UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b505dee1f5..4db1e2c864 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -135,7 +135,7 @@ typedef struct TransactionStateData
{
TransactionId transactionId; /* my XID, or Invalid if none */
#ifdef PGXC /* PGXC_COORD */
- GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */
+ GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */
#endif
SubTransactionId subTransactionId; /* my subxact ID */
char *name; /* savepoint name, if any */
@@ -314,7 +314,7 @@ GetCurrentGlobalTransactionId(void)
* GetGlobalTransactionId
*
* This will return the GXID of the specified transaction,
- * getting one from the GTM if it's not yet set.
+ * getting one from the GTM if it's not yet set.
*/
static GlobalTransactionId
GetGlobalTransactionId(TransactionState s)
@@ -469,7 +469,7 @@ AssignTransactionId(TransactionState s)
if (IS_PGXC_COORDINATOR)
{
s->transactionId = (TransactionId) GetGlobalTransactionId(s);
- elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s",
+ elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s",
s->transactionId, isSubXact ? "true" : "false");
}
else
@@ -1679,6 +1679,14 @@ CommitTransaction(void)
*/
AtEOXact_UpdateFlatFiles(true);
+#ifdef PGXC
+ /*
+ * There can be error on the data nodes. So go to data nodes before
+ * changing transaction state and local clean up
+ */
+ DataNodeCommit();
+#endif
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
@@ -1694,13 +1702,13 @@ CommitTransaction(void)
latestXid = RecordTransactionCommit();
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
+
#ifdef PGXC
+ /*
+ * Now we can let GTM know about transaction commit
+ */
if (IS_PGXC_COORDINATOR)
{
- /* Make sure this committed on the DataNodes,
- * if so it will just return
- */
- DataNodeCommit(DestNone);
CommitTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
@@ -1712,7 +1720,7 @@ CommitTransaction(void)
CommitTranGTM((GlobalTransactionId) latestXid);
}
#endif
-
+
/*
* Let others know about no transaction in progress by me. Note that this
* must be done _before_ releasing locks we hold and _after_
@@ -1808,7 +1816,7 @@ CommitTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
-#ifdef PGXC
+#ifdef PGXC
if (IS_PGXC_COORDINATOR)
s->globalTransactionId = InvalidGlobalTransactionId;
else if (IS_PGXC_DATANODE)
@@ -2142,10 +2150,10 @@ AbortTransaction(void)
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
{
- /* Make sure this is rolled back on the DataNodes,
- * if so it will just return
+ /* Make sure this is rolled back on the DataNodes,
+ * if so it will just return
*/
- DataNodeRollback(DestNone);
+ DataNodeRollback();
RollbackTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 517b1e4d78..0f4072d01e 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -199,6 +199,9 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum)
handle->sock = sock;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
+#ifdef DN_CONNECTION_DEBUG
+ handle->have_row_desc = false;
+#endif
handle->error = NULL;
handle->outEnd = 0;
handle->inStart = 0;
@@ -211,7 +214,7 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum)
* Wait while at least one of specified connections has data available and read
* the data into the buffer
*/
-void
+int
data_node_receive(const int conn_count,
DataNodeHandle ** connections, struct timeval * timeout)
{
@@ -239,7 +242,7 @@ data_node_receive(const int conn_count,
* Return if we do not have connections to receive input
*/
if (nfds == 0)
- return;
+ return 0;
retry:
res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
@@ -249,27 +252,19 @@ retry:
if (errno == EINTR || errno == EAGAIN)
goto retry;
- /*
- * PGXCTODO - we may want to close the connections and notify the
- * pooler that these are invalid.
- */
if (errno == EBADF)
{
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() bad file descriptor set")));
+ elog(WARNING, "select() bad file descriptor set");
}
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() error: %d", errno)));
+ elog(WARNING, "select() error: %d", errno);
+ return errno;
}
if (res_select == 0)
{
/* Handle timeout */
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("timeout while waiting for response")));
+ elog(WARNING, "timeout while waiting for response");
+ return EOF;
}
/* read data */
@@ -283,10 +278,9 @@ retry:
if (read_status == EOF || read_status < 0)
{
- /* PGXCTODO - we should notify the pooler to destroy the connections */
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on datanode connection")));
+ add_error_message(conn, "unexpected EOF on datanode connection");
+ elog(WARNING, "unexpected EOF on datanode connection");
+ return EOF;
}
else
{
@@ -294,6 +288,7 @@ retry:
}
}
}
+ return 0;
}
@@ -522,7 +517,7 @@ get_message(DataNodeHandle *conn, int *len, char **msg)
* ensure_in_buffer_capacity() will immediately return
*/
ensure_in_buffer_capacity(5 + (size_t) *len, conn);
- conn->state == DN_CONNECTION_STATE_QUERY;
+ conn->state = DN_CONNECTION_STATE_QUERY;
conn->inCursor = conn->inStart;
return '\0';
}
@@ -539,19 +534,27 @@ void
release_handles(void)
{
int i;
+ int discard[NumDataNodes];
+ int ndisc = 0;
if (node_count == 0)
return;
- PoolManagerReleaseConnections();
for (i = 0; i < NumDataNodes; i++)
{
DataNodeHandle *handle = &handles[i];
if (handle->sock != NO_SOCKET)
+ {
+ if (handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ elog(WARNING, "Connection to data node %d has unexpected state %d and will be dropped", handle->nodenum, handle->state);
+ discard[ndisc++] = handle->nodenum;
+ }
data_node_free(handle);
+ }
}
-
+ PoolManagerReleaseConnections(ndisc, discard);
node_count = 0;
}
@@ -897,7 +900,7 @@ void
add_error_message(DataNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
- handle->state = DN_CONNECTION_STATE_IDLE;
+ handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
if (handle->error)
{
/* PGXCTODO append */
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index c6f9042b43..bbedef0f2f 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -24,6 +24,7 @@
#include "miscadmin.h"
#include "pgxc/execRemote.h"
#include "pgxc/poolmgr.h"
+#include "storage/ipc.h"
#include "utils/datum.h"
#include "utils/memutils.h"
#include "utils/tuplesort.h"
@@ -40,9 +41,10 @@ static bool autocommit = true;
static DataNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
-static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid);
-static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest);
-static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest);
+static int data_node_begin(int conn_count, DataNodeHandle ** connections,
+ GlobalTransactionId gxid);
+static int data_node_commit(int conn_count, DataNodeHandle ** connections);
+static int data_node_rollback(int conn_count, DataNodeHandle ** connections);
static void clear_write_node_list();
@@ -920,7 +922,7 @@ FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
/*
* Handle responses from the Data node connections
*/
-static void
+static int
data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
struct timeval * timeout, RemoteQueryState *combiner)
{
@@ -940,7 +942,8 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
{
int i = 0;
- data_node_receive(count, to_receive, timeout);
+ if (data_node_receive(count, to_receive, timeout))
+ return EOF;
while (i < count)
{
int result = handle_response(to_receive[i], combiner);
@@ -959,12 +962,17 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
break;
default:
/* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type)));
+ add_error_message(to_receive[i], "Unexpected response from the data nodes");
+ elog(WARNING, "Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type);
+ /* Stop tracking and move last connection in place */
+ count--;
+ if (i < count)
+ to_receive[i] = to_receive[count];
}
}
}
+
+ return 0;
}
/*
@@ -990,6 +998,18 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
if (conn->state == DN_CONNECTION_STATE_QUERY)
return RESPONSE_EOF;
+ /*
+ * If we are in the process of shutting down, we
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
+ if (proc_exit_inprogress)
+ {
+ conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ return RESPONSE_EOF;
+ }
+
/* TODO handle other possible responses */
switch (get_message(conn, &msg_len, &msg))
{
@@ -1005,10 +1025,17 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
HandleCommandComplete(combiner, msg, msg_len);
break;
case 'T': /* RowDescription */
+#ifdef DN_CONNECTION_DEBUG
+ Assert(!conn->have_row_desc);
+ conn->have_row_desc = true;
+#endif
if (HandleRowDescription(combiner, msg, msg_len))
return RESPONSE_TUPDESC;
break;
case 'D': /* DataRow */
+#ifdef DN_CONNECTION_DEBUG
+ Assert(conn->have_row_desc);
+#endif
HandleDataRow(combiner, msg, msg_len);
return RESPONSE_DATAROW;
case 'G': /* CopyInResponse */
@@ -1042,6 +1069,9 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
case 'Z': /* ReadyForQuery */
conn->transaction_status = msg[0];
conn->state = DN_CONNECTION_STATE_IDLE;
+#ifdef DN_CONNECTION_DEBUG
+ conn->have_row_desc = false;
+#endif
return RESPONSE_COMPLETE;
case 'I': /* EmptyQuery */
default:
@@ -1058,7 +1088,8 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
* Send BEGIN command to the Data nodes and receive responses
*/
static int
-data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid)
+data_node_begin(int conn_count, DataNodeHandle ** connections,
+ GlobalTransactionId gxid)
{
int i;
struct timeval *timeout = NULL;
@@ -1078,7 +1109,8 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest,
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ return EOF;
/* Verify status */
return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
@@ -1109,12 +1141,12 @@ DataNodeBegin(void)
/*
- * Commit current transaction, use two-phase commit if necessary
+ * Commit current transaction on data nodes where it has been started
*/
-int
-DataNodeCommit(CommandDest dest)
+void
+DataNodeCommit(void)
{
- int res;
+ int res = 0;
int tran_count;
DataNodeHandle *connections[NumDataNodes];
@@ -1128,7 +1160,7 @@ DataNodeCommit(CommandDest dest)
if (tran_count == 0)
goto finish;
- res = data_node_commit(tran_count, connections, dest);
+ res = data_node_commit(tran_count, connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1138,15 +1170,19 @@ finish:
release_handles();
autocommit = true;
clear_write_node_list();
- return res;
+ if (res != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not commit connection on data nodes")));
}
/*
- * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses
+ * Commit transaction on specified data node connections, use two-phase commit
+ * if more then on one node data have been modified during the transactioon.
*/
static int
-data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+data_node_commit(int conn_count, DataNodeHandle ** connections)
{
int i;
struct timeval *timeout = NULL;
@@ -1191,13 +1227,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ result = EOF;
/* Reset combiner */
if (!ValidateAndResetCombiner(combiner))
- {
result = EOF;
- }
}
if (!do2PC)
@@ -1238,7 +1273,8 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest
combiner->dest = None_Receiver;
}
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ result = EOF;
result = ValidateAndCloseCombiner(combiner) ? result : EOF;
finish:
@@ -1253,7 +1289,7 @@ finish:
* Rollback current transaction
*/
int
-DataNodeRollback(CommandDest dest)
+DataNodeRollback(void)
{
int res = 0;
int tran_count;
@@ -1269,7 +1305,7 @@ DataNodeRollback(CommandDest dest)
if (tran_count == 0)
goto finish;
- res = data_node_rollback(tran_count, connections, dest);
+ res = data_node_rollback(tran_count, connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1287,24 +1323,23 @@ finish:
* Send ROLLBACK command down to the Data nodes and handle responses
*/
static int
-data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+data_node_rollback(int conn_count, DataNodeHandle ** connections)
{
int i;
struct timeval *timeout = NULL;
- int result = 0;
RemoteQueryState *combiner;
/* Send ROLLBACK - */
for (i = 0; i < conn_count; i++)
{
- if (data_node_send_query(connections[i], "ROLLBACK"))
- result = EOF;
+ data_node_send_query(connections[i], "ROLLBACK");
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ return EOF;
/* Verify status */
return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
@@ -1404,7 +1439,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
if (new_count > 0 && need_tran)
{
/* Start transaction on connections where it is not started */
- if (data_node_begin(new_count, newConnections, DestNone, gxid))
+ if (data_node_begin(new_count, newConnections, gxid))
{
pfree(connections);
pfree(copy_connections);
@@ -1448,8 +1483,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
- if (!ValidateAndCloseCombiner(combiner))
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner)
+ || !ValidateAndCloseCombiner(combiner))
{
if (autocommit)
{
@@ -1665,6 +1700,12 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on datanode connection")));
+ else
+ /*
+ * Set proper connection status - handle_response
+ * has changed it to DN_CONNECTION_STATE_QUERY
+ */
+ handle->state = DN_CONNECTION_STATE_COPY_OUT;
}
/* There is no more data that can be read from connection */
}
@@ -1746,7 +1787,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
combiner->dest = None_Receiver;
- data_node_receive_responses(1, &primary_handle, timeout, combiner);
+ error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error;
}
for (i = 0; i < conn_count; i++)
@@ -1786,30 +1827,14 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
combiner = CreateResponseCombiner(conn_count, combine_type);
combiner->dest = None_Receiver;
}
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
processed = combiner->row_count;
if (!ValidateAndCloseCombiner(combiner) || error)
- {
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else
- if (!PersistentConnections) release_handles();
- }
-
- return 0;
- }
-
- if (autocommit)
- {
- if (need_tran)
- DataNodeCommit(DestNone);
- else
- if (!PersistentConnections) release_handles();
- }
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Error while running COPY")));
return processed;
}
@@ -1882,9 +1907,17 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
else
{
int i;
+
+ /*
+ * Data node may be sending junk columns which are always at the end,
+ * but it must not be shorter then result slot.
+ */
+ Assert(dst->tts_tupleDescriptor->natts <= src->tts_tupleDescriptor->natts);
ExecClearTuple(dst);
slot_getallattrs(src);
- /* PGXCTODO revisit: probably incorrect */
+ /*
+ * PGXCTODO revisit: if it is correct to copy Datums using assignment?
+ */
for (i = 0; i < dst->tts_tupleDescriptor->natts; i++)
{
dst->tts_values[i] = src->tts_values[i];
@@ -1911,6 +1944,8 @@ ExecRemoteQuery(RemoteQueryState *node)
EState *estate = node->ss.ps.state;
TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
+ bool have_tuple = false;
+
if (!node->query_Done)
{
@@ -1974,7 +2009,7 @@ ExecRemoteQuery(RemoteQueryState *node)
else
need_tran = !autocommit || total_conn_count > 1;
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
stat_statement();
if (autocommit)
@@ -2052,7 +2087,7 @@ ExecRemoteQuery(RemoteQueryState *node)
new_connections[new_count++] = connections[i];
if (new_count)
- data_node_begin(new_count, new_connections, DestNone, gxid);
+ data_node_begin(new_count, new_connections, gxid);
}
/* See if we have a primary nodes, execute on it first before the others */
@@ -2088,36 +2123,22 @@ ExecRemoteQuery(RemoteQueryState *node)
while (node->command_complete_count < 1)
{
- PG_TRY();
- {
- data_node_receive(1, primaryconnection, NULL);
- while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
- data_node_receive(1, primaryconnection, NULL);
- if (node->errorMessage)
- {
- char *code = node->errorCode;
+ if (data_node_receive(1, primaryconnection, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
+ if (data_node_receive(1, primaryconnection, NULL))
ereport(ERROR,
- (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
- errmsg("%s", node->errorMessage)));
- }
- }
- /* If we got an error response return immediately */
- PG_CATCH();
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ if (node->errorMessage)
{
- /* We are going to exit, so release combiner */
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- pfree(primaryconnection);
- pfree(connections);
- PG_RE_THROW();
+ char *code = node->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
}
- PG_END_TRY();
}
pfree(primaryconnection);
}
@@ -2148,8 +2169,6 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
- PG_TRY();
- {
/*
* Stop if all commands are completed or we got a data row and
* initialized state node for subsequent invocations
@@ -2158,7 +2177,10 @@ ExecRemoteQuery(RemoteQueryState *node)
{
int i = 0;
- data_node_receive(regular_conn_count, connections, NULL);
+ if (data_node_receive(regular_conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
/*
* Handle input from the data nodes.
* If we got a RESPONSE_DATAROW we can break handling to wrap
@@ -2234,185 +2256,148 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
}
- }
- /* If we got an error response return immediately */
- PG_CATCH();
- {
- /* We are going to exit, so release combiner */
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
- PG_RE_THROW();
- }
- PG_END_TRY();
+
node->query_Done = true;
- node->need_tran = need_tran;
}
- PG_TRY();
+ if (node->tuplesortstate)
{
- bool have_tuple = false;
-
- if (node->tuplesortstate)
+ while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
+ true, scanslot))
{
- while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
- true, scanslot))
+ have_tuple = true;
+ /*
+ * If DISTINCT is specified and current tuple matches to
+ * previous skip it and get next one.
+ * Othervise return current tuple
+ */
+ if (step->distinct)
{
- have_tuple = true;
/*
- * If DISTINCT is specified and current tuple matches to
- * previous skip it and get next one.
- * Othervise return current tuple
+ * Always receive very first tuple and
+ * skip to next if scan slot match to previous (result slot)
*/
- if (step->distinct)
+ if (!TupIsNull(resultslot) &&
+ execTuplesMatch(scanslot,
+ resultslot,
+ step->distinct->numCols,
+ step->distinct->uniqColIdx,
+ node->eqfunctions,
+ node->tmp_ctx))
{
- /*
- * Always receive very first tuple and
- * skip to next if scan slot match to previous (result slot)
- */
- if (!TupIsNull(resultslot) &&
- execTuplesMatch(scanslot,
- resultslot,
- step->distinct->numCols,
- step->distinct->uniqColIdx,
- node->eqfunctions,
- node->tmp_ctx))
- {
- have_tuple = false;
- continue;
- }
+ have_tuple = false;
+ continue;
}
- copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
- break;
}
- if (!have_tuple)
- ExecClearTuple(resultslot);
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ break;
}
- else
+ if (!have_tuple)
+ ExecClearTuple(resultslot);
+ }
+ else
+ {
+ while (node->conn_count > 0 && !have_tuple)
{
- while (node->conn_count > 0 && !have_tuple)
- {
- int i;
+ int i;
- /*
- * If combiner already has tuple go ahead and return it
- * otherwise tuple will be cleared
- */
- if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ /*
+ * If combiner already has tuple go ahead and return it
+ * otherwise tuple will be cleared
+ */
+ if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ {
+ if (node->simple_aggregates)
{
- if (node->simple_aggregates)
- {
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
- have_tuple = true;
- }
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
}
-
- /*
- * Handle input to get next row or ensure command is completed,
- * starting from connection next after current. If connection
- * does not
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
-
- for (;;)
+ else
{
- int res = handle_response(node->connections[i], node);
- if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- data_node_receive(node->conn_count, node->connections, NULL);
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_DATAROW)
- {
- node->current_conn = i;
- break;
- }
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ have_tuple = true;
}
}
/*
- * We may need to finalize aggregates
+ * Handle input to get next row or ensure command is completed,
+ * starting from connection next after current. If connection
+ * does not
*/
- if (!have_tuple && node->simple_aggregates)
+ if ((i = node->current_conn + 1) == node->conn_count)
+ i = 0;
+
+ for (;;)
{
- finish_simple_aggregates(node, resultslot);
- if (!TupIsNull(resultslot))
+ int res = handle_response(node->connections[i], node);
+ if (res == RESPONSE_EOF)
{
- (*node->dest->receiveSlot) (resultslot, node->dest);
- have_tuple = true;
+ /* go to next connection */
+ if (++i == node->conn_count)
+ i = 0;
+ /* if we cycled over all connections we need to receive more */
+ if (i == node->current_conn)
+ if (data_node_receive(node->conn_count, node->connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--node->conn_count == 0)
+ break;
+ if (i == node->conn_count)
+ i = 0;
+ else
+ node->connections[i] = node->connections[node->conn_count];
+ if (node->current_conn == node->conn_count)
+ node->current_conn = i;
+ }
+ else if (res == RESPONSE_DATAROW)
+ {
+ node->current_conn = i;
+ break;
}
}
-
- if (!have_tuple) /* report end of scan */
- ExecClearTuple(resultslot);
-
}
- if (node->errorMessage)
+ /*
+ * We may need to finalize aggregates
+ */
+ if (!have_tuple && node->simple_aggregates)
{
- char *code = node->errorCode;
- ereport(ERROR,
- (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
- errmsg("%s", node->errorMessage)));
+ finish_simple_aggregates(node, resultslot);
+ if (!TupIsNull(resultslot))
+ {
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ have_tuple = true;
+ }
}
- /*
- * If command is completed we should commit work.
- */
- if (node->conn_count == 0 && autocommit && node->need_tran)
- DataNodeCommit(DestNone);
+ if (!have_tuple) /* report end of scan */
+ ExecClearTuple(resultslot);
+
}
- /* If we got an error response return immediately */
- PG_CATCH();
+
+ if (node->errorMessage)
{
- /* We are going to exit, so release combiner */
- if (autocommit)
- {
- if (node->need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
- PG_RE_THROW();
+ char *code = node->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
}
- PG_END_TRY();
return resultslot;
}
@@ -2436,7 +2421,7 @@ DataNodeCleanAndRelease(int code, Datum arg)
/* Rollback on Data Nodes */
if (IsTransactionState())
{
- DataNodeRollback(DestNone);
+ DataNodeRollback();
/* Rollback on GTM if transaction id opened. */
RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny());
diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c
index 4625261743..7e4771cd71 100644
--- a/src/backend/pgxc/pool/poolcomm.c
+++ b/src/backend/pgxc/pool/poolcomm.c
@@ -22,7 +22,9 @@
#include <errno.h>
#include <stddef.h>
#include "c.h"
+#include "postgres.h"
#include "pgxc/poolcomm.h"
+#include "storage/ipc.h"
#include "utils/elog.h"
#include "miscadmin.h"
@@ -408,9 +410,16 @@ pool_flush(PoolPort *port)
if (errno != last_reported_send_errno)
{
last_reported_send_errno = errno;
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("could not send data to client: %m")));
+
+ /*
+ * Handle a seg fault that may later occur in proc array
+ * when this fails when we are already shutting down
+ * If shutting down already, do not call.
+ */
+ if (!proc_exit_inprogress)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not send data to client: %m")));
}
/*
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 6427da391c..dbb8aed3d4 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -93,7 +93,7 @@ static DatabasePool *find_database_pool(const char *database);
static DatabasePool *remove_database_pool(const char *database);
static int *agent_acquire_connections(PoolAgent *agent, List *nodelist);
static DataNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node);
-static void agent_release_connections(PoolAgent *agent, bool clean);
+static void agent_release_connections(PoolAgent *agent, List *discard);
static void release_connection(DatabasePool *dbPool, DataNodePoolSlot *slot, int index, bool clean);
static void destroy_slot(DataNodePoolSlot *slot);
static void grow_pool(DatabasePool *dbPool, int index);
@@ -587,7 +587,7 @@ agent_init(PoolAgent *agent, const char *database, List *nodes)
/* disconnect if we still connected */
if (agent->pool)
- agent_release_connections(agent, false);
+ agent_release_connections(agent, NULL);
/* find database */
agent->pool = find_database_pool(database);
@@ -612,7 +612,7 @@ agent_destroy(PoolAgent *agent)
/* Discard connections if any remaining */
if (agent->pool)
- agent_release_connections(agent, false);
+ agent_release_connections(agent, NULL);
/* find agent in the list */
for (i = 0; i < agentCount; i++)
@@ -700,11 +700,6 @@ static void
agent_handle_input(PoolAgent * agent, StringInfo s)
{
int qtype;
- const char *database;
- int nodecount;
- List *nodelist = NIL;
- int *fds;
- int i;
qtype = pool_getbyte(&agent->port);
/*
@@ -712,6 +707,12 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
*/
for (;;)
{
+ const char *database;
+ int nodecount;
+ List *nodelist = NIL;
+ int *fds;
+ int i;
+
switch (qtype)
{
case 'c': /* CONNECT */
@@ -729,9 +730,7 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8);
nodecount = pq_getmsgint(s, 4);
for (i = 0; i < nodecount; i++)
- {
nodelist = lappend_int(nodelist, pq_getmsgint(s, 4));
- }
pq_getmsgend(s);
/*
* In case of error agent_acquire_connections will log
@@ -744,9 +743,13 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
pfree(fds);
break;
case 'r': /* RELEASE CONNECTIONS */
- pool_getmessage(&agent->port, s, 4);
+ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8);
+ nodecount = pq_getmsgint(s, 4);
+ for (i = 0; i < nodecount; i++)
+ nodelist = lappend_int(nodelist, pq_getmsgint(s, 4));
pq_getmsgend(s);
- agent_release_connections(agent, true);
+ agent_release_connections(agent, nodelist);
+ list_free(nodelist);
break;
default: /* EOF or protocol violation */
agent_destroy(agent);
@@ -831,11 +834,24 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist)
* Retun connections back to the pool
*/
void
-PoolManagerReleaseConnections(void)
+PoolManagerReleaseConnections(int ndisc, int* discard)
{
+ uint32 n32;
+ uint32 buf[1 + ndisc];
+ int i;
+
Assert(Handle);
- pool_putmessage(&Handle->port, 'r', NULL, 0);
+ n32 = htonl((uint32) ndisc);
+ buf[0] = n32;
+
+ for (i = 0; i < ndisc;)
+ {
+ n32 = htonl((uint32) discard[i++]);
+ buf[i] = n32;
+ }
+ pool_putmessage(&Handle->port, 'r', (char *) buf,
+ (1 + ndisc) * sizeof(uint32));
pool_flush(&Handle->port);
}
@@ -844,23 +860,40 @@ PoolManagerReleaseConnections(void)
* Release connections
*/
static void
-agent_release_connections(PoolAgent *agent, bool clean)
+agent_release_connections(PoolAgent *agent, List *discard)
{
int i;
+ DataNodePoolSlot *slot;
+
if (!agent->connections)
return;
- /* Enumerate connections */
- for (i = 0; i < NumDataNodes; i++)
+ if (discard)
{
- DataNodePoolSlot *slot;
+ ListCell *lc;
+ foreach(lc, discard)
+ {
+ int node = lfirst_int(lc);
+ Assert(node > 0 && node <= NumDataNodes);
+ slot = agent->connections[node - 1];
+
+ /* Discard connection */
+ if (slot)
+ release_connection(agent->pool, slot, node - 1, false);
+ agent->connections[node - 1] = NULL;
+ }
+ }
+
+ /* Remaining connections are assumed to be clean */
+ for (i = 0; i < NumDataNodes; i++)
+ {
slot = agent->connections[i];
/* Release connection */
if (slot)
- release_connection(agent->pool, slot, i, clean);
+ release_connection(agent->pool, slot, i, true);
agent->connections[i] = NULL;
}
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 24cfe42319..e59c86920e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4393,11 +4393,11 @@ pgxc_transaction_stmt (Node *parsetree)
break;
case TRANS_STMT_COMMIT:
- DataNodeCommit(DestNone);
+ DataNodeCommit();
break;
case TRANS_STMT_ROLLBACK:
- DataNodeRollback(DestNone);
+ DataNodeRollback();
break;
default:
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index c506ce064f..e60fdd9e02 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2875,12 +2875,16 @@ reversedirection_heap(Tuplesortstate *state)
static unsigned int
getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK)
{
+ DataNodeHandle *conn = state->combiner->connections[tapenum];
for (;;)
{
- switch (handle_response(state->combiner->connections[tapenum], state->combiner))
+ switch (handle_response(conn, state->combiner))
{
case RESPONSE_EOF:
- data_node_receive(1, state->combiner->connections + tapenum, NULL);
+ if (data_node_receive(1, &conn, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg(conn->error)));
break;
case RESPONSE_COMPLETE:
if (eofOK)
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
index ab95022b5f..849d84acea 100644
--- a/src/include/pgxc/datanode.h
+++ b/src/include/pgxc/datanode.h
@@ -50,6 +50,9 @@ struct data_node_handle
/* Connection state */
char transaction_status;
DNConnectionState state;
+#ifdef DN_CONNECTION_DEBUG
+ bool have_row_desc;
+#endif
char *error;
/* Output buffer */
char *outBuffer;
@@ -86,7 +89,7 @@ extern int data_node_send_query(DataNodeHandle * handle, const char *query);
extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid);
extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
-extern void data_node_receive(const int conn_count,
+extern int data_node_receive(const int conn_count,
DataNodeHandle ** connections, struct timeval * timeout);
extern int data_node_read_data(DataNodeHandle * conn);
extern int send_some(DataNodeHandle * handle, int len);
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index d99806a01b..e9b59ccbd9 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -62,7 +62,6 @@ typedef struct RemoteQueryState
char errorCode[5]; /* error code to send back to client */
char *errorMessage; /* error message to send back to client */
bool query_Done; /* query has been sent down to data nodes */
- bool need_tran; /* auto commit on nodes after completion */
char *completionTag; /* completion tag to present to caller */
char *msg; /* last data row message */
int msglen; /* length of the data row message */
@@ -81,8 +80,8 @@ typedef struct RemoteQueryState
/* Multinode Executor */
extern void DataNodeBegin(void);
-extern int DataNodeCommit(CommandDest dest);
-extern int DataNodeRollback(CommandDest dest);
+extern void DataNodeCommit(void);
+extern int DataNodeRollback(void);
extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index 2c9128e7c7..b7ac3aedf0 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -45,7 +45,7 @@ typedef struct
char *connstr;
int freeSize; /* available connections */
int size; /* total pool size */
- DataNodePoolSlot **slot;
+ DataNodePoolSlot **slot;
} DataNodePool;
/* All pools for specified database */
@@ -57,7 +57,7 @@ typedef struct databasepool
struct databasepool *next;
} DatabasePool;
-/* Agent of client session (Pool Manager side)
+/* Agent of client session (Pool Manager side)
* Acts as a session manager, grouping connections together
*/
typedef struct
@@ -125,6 +125,6 @@ extern void PoolManagerConnect(PoolHandle *handle, const char *database);
extern int *PoolManagerGetConnections(List *nodelist);
/* Retun connections back to the pool */
-extern void PoolManagerReleaseConnections(void);
+extern void PoolManagerReleaseConnections(int ndisc, int* discard);
#endif