diff options
| author | Mason S | 2010-08-05 18:36:37 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:45:14 +0000 |
| commit | 3c1f6c7ef0230d22ef2580d0212901b013841d45 (patch) | |
| tree | 6f4c7770c2e6cca4768f023971c9463922b0fafd /src | |
| parent | 9624091e0a1dc23b8eb17c7cd168ff4f3e57b3ec (diff) | |
Added more handling to deal with data node connection failures.
This includes forcing the release of connections in an unexpected
state and bug fixes.
This was written by Andrei Martsinchyk, with some additional
handling added by Mason.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/access/transam/xact.c | 32 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 49 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 449 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolcomm.c | 15 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 71 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 4 | ||||
| -rw-r--r-- | src/backend/utils/sort/tuplesort.c | 8 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/poolmgr.h | 6 |
10 files changed, 344 insertions, 300 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b505dee1f5..4db1e2c864 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -135,7 +135,7 @@ typedef struct TransactionStateData { TransactionId transactionId; /* my XID, or Invalid if none */ #ifdef PGXC /* PGXC_COORD */ - GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ + GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ #endif SubTransactionId subTransactionId; /* my subxact ID */ char *name; /* savepoint name, if any */ @@ -314,7 +314,7 @@ GetCurrentGlobalTransactionId(void) * GetGlobalTransactionId * * This will return the GXID of the specified transaction, - * getting one from the GTM if it's not yet set. + * getting one from the GTM if it's not yet set. */ static GlobalTransactionId GetGlobalTransactionId(TransactionState s) @@ -469,7 +469,7 @@ AssignTransactionId(TransactionState s) if (IS_PGXC_COORDINATOR) { s->transactionId = (TransactionId) GetGlobalTransactionId(s); - elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s", + elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s", s->transactionId, isSubXact ? "true" : "false"); } else @@ -1679,6 +1679,14 @@ CommitTransaction(void) */ AtEOXact_UpdateFlatFiles(true); +#ifdef PGXC + /* + * There can be error on the data nodes. So go to data nodes before + * changing transaction state and local clean up + */ + DataNodeCommit(); +#endif + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -1694,13 +1702,13 @@ CommitTransaction(void) latestXid = RecordTransactionCommit(); TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); + #ifdef PGXC + /* + * Now we can let GTM know about transaction commit + */ if (IS_PGXC_COORDINATOR) { - /* Make sure this committed on the DataNodes, - * if so it will just return - */ - DataNodeCommit(DestNone); CommitTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } @@ -1712,7 +1720,7 @@ CommitTransaction(void) CommitTranGTM((GlobalTransactionId) latestXid); } #endif - + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -1808,7 +1816,7 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; -#ifdef PGXC +#ifdef PGXC if (IS_PGXC_COORDINATOR) s->globalTransactionId = InvalidGlobalTransactionId; else if (IS_PGXC_DATANODE) @@ -2142,10 +2150,10 @@ AbortTransaction(void) #ifdef PGXC if (IS_PGXC_COORDINATOR) { - /* Make sure this is rolled back on the DataNodes, - * if so it will just return + /* Make sure this is rolled back on the DataNodes, + * if so it will just return */ - DataNodeRollback(DestNone); + DataNodeRollback(); RollbackTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 517b1e4d78..0f4072d01e 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -199,6 +199,9 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum) handle->sock = sock; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; +#ifdef DN_CONNECTION_DEBUG + handle->have_row_desc = false; +#endif handle->error = NULL; handle->outEnd = 0; handle->inStart = 0; @@ -211,7 +214,7 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum) * Wait while at least one of specified connections has data available and read * the data into the buffer */ -void +int data_node_receive(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout) { @@ -239,7 +242,7 @@ data_node_receive(const int conn_count, * Return if we do not have connections to receive input */ if (nfds == 0) - return; + return 0; retry: res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); @@ -249,27 +252,19 @@ retry: if (errno == EINTR || errno == EAGAIN) goto retry; - /* - * PGXCTODO - we may want to close the connections and notify the - * pooler that these are invalid. - */ if (errno == EBADF) { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() bad file descriptor set"))); + elog(WARNING, "select() bad file descriptor set"); } - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() error: %d", errno))); + elog(WARNING, "select() error: %d", errno); + return errno; } if (res_select == 0) { /* Handle timeout */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("timeout while waiting for response"))); + elog(WARNING, "timeout while waiting for response"); + return EOF; } /* read data */ @@ -283,10 +278,9 @@ retry: if (read_status == EOF || read_status < 0) { - /* PGXCTODO - we should notify the pooler to destroy the connections */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on datanode connection"))); + add_error_message(conn, "unexpected EOF on datanode connection"); + elog(WARNING, "unexpected EOF on datanode connection"); + return EOF; } else { @@ -294,6 +288,7 @@ retry: } } } + return 0; } @@ -522,7 +517,7 @@ get_message(DataNodeHandle *conn, int *len, char **msg) * ensure_in_buffer_capacity() will immediately return */ ensure_in_buffer_capacity(5 + (size_t) *len, conn); - conn->state == DN_CONNECTION_STATE_QUERY; + conn->state = DN_CONNECTION_STATE_QUERY; conn->inCursor = conn->inStart; return '\0'; } @@ -539,19 +534,27 @@ void release_handles(void) { int i; + int discard[NumDataNodes]; + int ndisc = 0; if (node_count == 0) return; - PoolManagerReleaseConnections(); for (i = 0; i < NumDataNodes; i++) { DataNodeHandle *handle = &handles[i]; if (handle->sock != NO_SOCKET) + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + elog(WARNING, "Connection to data node %d has unexpected state %d and will be dropped", handle->nodenum, handle->state); + discard[ndisc++] = handle->nodenum; + } data_node_free(handle); + } } - + PoolManagerReleaseConnections(ndisc, discard); node_count = 0; } @@ -897,7 +900,7 @@ void add_error_message(DataNodeHandle *handle, const char *message) { handle->transaction_status = 'E'; - handle->state = DN_CONNECTION_STATE_IDLE; + handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY; if (handle->error) { /* PGXCTODO append */ diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index c6f9042b43..bbedef0f2f 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -24,6 +24,7 @@ #include "miscadmin.h" #include "pgxc/execRemote.h" #include "pgxc/poolmgr.h" +#include "storage/ipc.h" #include "utils/datum.h" #include "utils/memutils.h" #include "utils/tuplesort.h" @@ -40,9 +41,10 @@ static bool autocommit = true; static DataNodeHandle **write_node_list = NULL; static int write_node_count = 0; -static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest); -static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest); +static int data_node_begin(int conn_count, DataNodeHandle ** connections, + GlobalTransactionId gxid); +static int data_node_commit(int conn_count, DataNodeHandle ** connections); +static int data_node_rollback(int conn_count, DataNodeHandle ** connections); static void clear_write_node_list(); @@ -920,7 +922,7 @@ FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) /* * Handle responses from the Data node connections */ -static void +static int data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout, RemoteQueryState *combiner) { @@ -940,7 +942,8 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, { int i = 0; - data_node_receive(count, to_receive, timeout); + if (data_node_receive(count, to_receive, timeout)) + return EOF; while (i < count) { int result = handle_response(to_receive[i], combiner); @@ -959,12 +962,17 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, break; default: /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type))); + add_error_message(to_receive[i], "Unexpected response from the data nodes"); + elog(WARNING, "Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type); + /* Stop tracking and move last connection in place */ + count--; + if (i < count) + to_receive[i] = to_receive[count]; } } } + + return 0; } /* @@ -990,6 +998,18 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) if (conn->state == DN_CONNECTION_STATE_QUERY) return RESPONSE_EOF; + /* + * If we are in the process of shutting down, we + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ + if (proc_exit_inprogress) + { + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + return RESPONSE_EOF; + } + /* TODO handle other possible responses */ switch (get_message(conn, &msg_len, &msg)) { @@ -1005,10 +1025,17 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) HandleCommandComplete(combiner, msg, msg_len); break; case 'T': /* RowDescription */ +#ifdef DN_CONNECTION_DEBUG + Assert(!conn->have_row_desc); + conn->have_row_desc = true; +#endif if (HandleRowDescription(combiner, msg, msg_len)) return RESPONSE_TUPDESC; break; case 'D': /* DataRow */ +#ifdef DN_CONNECTION_DEBUG + Assert(conn->have_row_desc); +#endif HandleDataRow(combiner, msg, msg_len); return RESPONSE_DATAROW; case 'G': /* CopyInResponse */ @@ -1042,6 +1069,9 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) case 'Z': /* ReadyForQuery */ conn->transaction_status = msg[0]; conn->state = DN_CONNECTION_STATE_IDLE; +#ifdef DN_CONNECTION_DEBUG + conn->have_row_desc = false; +#endif return RESPONSE_COMPLETE; case 'I': /* EmptyQuery */ default: @@ -1058,7 +1088,8 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) * Send BEGIN command to the Data nodes and receive responses */ static int -data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid) +data_node_begin(int conn_count, DataNodeHandle ** connections, + GlobalTransactionId gxid) { int i; struct timeval *timeout = NULL; @@ -1078,7 +1109,8 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + return EOF; /* Verify status */ return ValidateAndCloseCombiner(combiner) ? 0 : EOF; @@ -1109,12 +1141,12 @@ DataNodeBegin(void) /* - * Commit current transaction, use two-phase commit if necessary + * Commit current transaction on data nodes where it has been started */ -int -DataNodeCommit(CommandDest dest) +void +DataNodeCommit(void) { - int res; + int res = 0; int tran_count; DataNodeHandle *connections[NumDataNodes]; @@ -1128,7 +1160,7 @@ DataNodeCommit(CommandDest dest) if (tran_count == 0) goto finish; - res = data_node_commit(tran_count, connections, dest); + res = data_node_commit(tran_count, connections); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -1138,15 +1170,19 @@ finish: release_handles(); autocommit = true; clear_write_node_list(); - return res; + if (res != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit connection on data nodes"))); } /* - * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses + * Commit transaction on specified data node connections, use two-phase commit + * if more then on one node data have been modified during the transactioon. */ static int -data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest) +data_node_commit(int conn_count, DataNodeHandle ** connections) { int i; struct timeval *timeout = NULL; @@ -1191,13 +1227,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + result = EOF; /* Reset combiner */ if (!ValidateAndResetCombiner(combiner)) - { result = EOF; - } } if (!do2PC) @@ -1238,7 +1273,8 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest combiner->dest = None_Receiver; } /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + result = EOF; result = ValidateAndCloseCombiner(combiner) ? result : EOF; finish: @@ -1253,7 +1289,7 @@ finish: * Rollback current transaction */ int -DataNodeRollback(CommandDest dest) +DataNodeRollback(void) { int res = 0; int tran_count; @@ -1269,7 +1305,7 @@ DataNodeRollback(CommandDest dest) if (tran_count == 0) goto finish; - res = data_node_rollback(tran_count, connections, dest); + res = data_node_rollback(tran_count, connections); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -1287,24 +1323,23 @@ finish: * Send ROLLBACK command down to the Data nodes and handle responses */ static int -data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest) +data_node_rollback(int conn_count, DataNodeHandle ** connections) { int i; struct timeval *timeout = NULL; - int result = 0; RemoteQueryState *combiner; /* Send ROLLBACK - */ for (i = 0; i < conn_count; i++) { - if (data_node_send_query(connections[i], "ROLLBACK")) - result = EOF; + data_node_send_query(connections[i], "ROLLBACK"); } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + return EOF; /* Verify status */ return ValidateAndCloseCombiner(combiner) ? 0 : EOF; @@ -1404,7 +1439,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ if (new_count > 0 && need_tran) { /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, newConnections, DestNone, gxid)) + if (data_node_begin(new_count, newConnections, gxid)) { pfree(connections); pfree(copy_connections); @@ -1448,8 +1483,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - if (!ValidateAndCloseCombiner(combiner)) + if (data_node_receive_responses(conn_count, connections, timeout, combiner) + || !ValidateAndCloseCombiner(combiner)) { if (autocommit) { @@ -1665,6 +1700,12 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on datanode connection"))); + else + /* + * Set proper connection status - handle_response + * has changed it to DN_CONNECTION_STATE_QUERY + */ + handle->state = DN_CONNECTION_STATE_COPY_OUT; } /* There is no more data that can be read from connection */ } @@ -1746,7 +1787,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, combiner = CreateResponseCombiner(conn_count + 1, combine_type); combiner->dest = None_Receiver; - data_node_receive_responses(1, &primary_handle, timeout, combiner); + error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error; } for (i = 0; i < conn_count; i++) @@ -1786,30 +1827,14 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, combiner = CreateResponseCombiner(conn_count, combine_type); combiner->dest = None_Receiver; } - data_node_receive_responses(conn_count, connections, timeout, combiner); + error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error; processed = combiner->row_count; if (!ValidateAndCloseCombiner(combiner) || error) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else - if (!PersistentConnections) release_handles(); - } - - return 0; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); - else - if (!PersistentConnections) release_handles(); - } + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Error while running COPY"))); return processed; } @@ -1882,9 +1907,17 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) else { int i; + + /* + * Data node may be sending junk columns which are always at the end, + * but it must not be shorter then result slot. + */ + Assert(dst->tts_tupleDescriptor->natts <= src->tts_tupleDescriptor->natts); ExecClearTuple(dst); slot_getallattrs(src); - /* PGXCTODO revisit: probably incorrect */ + /* + * PGXCTODO revisit: if it is correct to copy Datums using assignment? + */ for (i = 0; i < dst->tts_tupleDescriptor->natts; i++) { dst->tts_values[i] = src->tts_values[i]; @@ -1911,6 +1944,8 @@ ExecRemoteQuery(RemoteQueryState *node) EState *estate = node->ss.ps.state; TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + bool have_tuple = false; + if (!node->query_Done) { @@ -1974,7 +2009,7 @@ ExecRemoteQuery(RemoteQueryState *node) else need_tran = !autocommit || total_conn_count > 1; - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); stat_statement(); if (autocommit) @@ -2052,7 +2087,7 @@ ExecRemoteQuery(RemoteQueryState *node) new_connections[new_count++] = connections[i]; if (new_count) - data_node_begin(new_count, new_connections, DestNone, gxid); + data_node_begin(new_count, new_connections, gxid); } /* See if we have a primary nodes, execute on it first before the others */ @@ -2088,36 +2123,22 @@ ExecRemoteQuery(RemoteQueryState *node) while (node->command_complete_count < 1) { - PG_TRY(); - { - data_node_receive(1, primaryconnection, NULL); - while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) - data_node_receive(1, primaryconnection, NULL); - if (node->errorMessage) - { - char *code = node->errorCode; + if (data_node_receive(1, primaryconnection, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) + if (data_node_receive(1, primaryconnection, NULL)) ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); - } - } - /* If we got an error response return immediately */ - PG_CATCH(); + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + if (node->errorMessage) { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(primaryconnection); - pfree(connections); - PG_RE_THROW(); + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } - PG_END_TRY(); } pfree(primaryconnection); } @@ -2148,8 +2169,6 @@ ExecRemoteQuery(RemoteQueryState *node) } } - PG_TRY(); - { /* * Stop if all commands are completed or we got a data row and * initialized state node for subsequent invocations @@ -2158,7 +2177,10 @@ ExecRemoteQuery(RemoteQueryState *node) { int i = 0; - data_node_receive(regular_conn_count, connections, NULL); + if (data_node_receive(regular_conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); /* * Handle input from the data nodes. * If we got a RESPONSE_DATAROW we can break handling to wrap @@ -2234,185 +2256,148 @@ ExecRemoteQuery(RemoteQueryState *node) } } } - } - /* If we got an error response return immediately */ - PG_CATCH(); - { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - PG_RE_THROW(); - } - PG_END_TRY(); + node->query_Done = true; - node->need_tran = need_tran; } - PG_TRY(); + if (node->tuplesortstate) { - bool have_tuple = false; - - if (node->tuplesortstate) + while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, + true, scanslot)) { - while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, - true, scanslot)) + have_tuple = true; + /* + * If DISTINCT is specified and current tuple matches to + * previous skip it and get next one. + * Othervise return current tuple + */ + if (step->distinct) { - have_tuple = true; /* - * If DISTINCT is specified and current tuple matches to - * previous skip it and get next one. - * Othervise return current tuple + * Always receive very first tuple and + * skip to next if scan slot match to previous (result slot) */ - if (step->distinct) + if (!TupIsNull(resultslot) && + execTuplesMatch(scanslot, + resultslot, + step->distinct->numCols, + step->distinct->uniqColIdx, + node->eqfunctions, + node->tmp_ctx)) { - /* - * Always receive very first tuple and - * skip to next if scan slot match to previous (result slot) - */ - if (!TupIsNull(resultslot) && - execTuplesMatch(scanslot, - resultslot, - step->distinct->numCols, - step->distinct->uniqColIdx, - node->eqfunctions, - node->tmp_ctx)) - { - have_tuple = false; - continue; - } + have_tuple = false; + continue; } - copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); - break; } - if (!have_tuple) - ExecClearTuple(resultslot); + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + break; } - else + if (!have_tuple) + ExecClearTuple(resultslot); + } + else + { + while (node->conn_count > 0 && !have_tuple) { - while (node->conn_count > 0 && !have_tuple) - { - int i; + int i; - /* - * If combiner already has tuple go ahead and return it - * otherwise tuple will be cleared - */ - if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + /* + * If combiner already has tuple go ahead and return it + * otherwise tuple will be cleared + */ + if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + { + if (node->simple_aggregates) { - if (node->simple_aggregates) - { - /* - * Advance aggregate functions and allow to read up next - * data row message and get tuple in the same slot on - * next iteration - */ - exec_simple_aggregates(node, scanslot); - } - else - { - /* - * Receive current slot and read up next data row - * message before exiting the loop. Next time when this - * function is invoked we will have either data row - * message ready or EOF - */ - copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); - have_tuple = true; - } + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); } - - /* - * Handle input to get next row or ensure command is completed, - * starting from connection next after current. If connection - * does not - */ - if ((i = node->current_conn + 1) == node->conn_count) - i = 0; - - for (;;) + else { - int res = handle_response(node->connections[i], node); - if (res == RESPONSE_EOF) - { - /* go to next connection */ - if (++i == node->conn_count) - i = 0; - /* if we cycled over all connections we need to receive more */ - if (i == node->current_conn) - data_node_receive(node->conn_count, node->connections, NULL); - } - else if (res == RESPONSE_COMPLETE) - { - if (--node->conn_count == 0) - break; - if (i == node->conn_count) - i = 0; - else - node->connections[i] = node->connections[node->conn_count]; - if (node->current_conn == node->conn_count) - node->current_conn = i; - } - else if (res == RESPONSE_DATAROW) - { - node->current_conn = i; - break; - } + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; } } /* - * We may need to finalize aggregates + * Handle input to get next row or ensure command is completed, + * starting from connection next after current. If connection + * does not */ - if (!have_tuple && node->simple_aggregates) + if ((i = node->current_conn + 1) == node->conn_count) + i = 0; + + for (;;) { - finish_simple_aggregates(node, resultslot); - if (!TupIsNull(resultslot)) + int res = handle_response(node->connections[i], node); + if (res == RESPONSE_EOF) { - (*node->dest->receiveSlot) (resultslot, node->dest); - have_tuple = true; + /* go to next connection */ + if (++i == node->conn_count) + i = 0; + /* if we cycled over all connections we need to receive more */ + if (i == node->current_conn) + if (data_node_receive(node->conn_count, node->connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + } + else if (res == RESPONSE_COMPLETE) + { + if (--node->conn_count == 0) + break; + if (i == node->conn_count) + i = 0; + else + node->connections[i] = node->connections[node->conn_count]; + if (node->current_conn == node->conn_count) + node->current_conn = i; + } + else if (res == RESPONSE_DATAROW) + { + node->current_conn = i; + break; } } - - if (!have_tuple) /* report end of scan */ - ExecClearTuple(resultslot); - } - if (node->errorMessage) + /* + * We may need to finalize aggregates + */ + if (!have_tuple && node->simple_aggregates) { - char *code = node->errorCode; - ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); + finish_simple_aggregates(node, resultslot); + if (!TupIsNull(resultslot)) + { + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } } - /* - * If command is completed we should commit work. - */ - if (node->conn_count == 0 && autocommit && node->need_tran) - DataNodeCommit(DestNone); + if (!have_tuple) /* report end of scan */ + ExecClearTuple(resultslot); + } - /* If we got an error response return immediately */ - PG_CATCH(); + + if (node->errorMessage) { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (node->need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - PG_RE_THROW(); + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } - PG_END_TRY(); return resultslot; } @@ -2436,7 +2421,7 @@ DataNodeCleanAndRelease(int code, Datum arg) /* Rollback on Data Nodes */ if (IsTransactionState()) { - DataNodeRollback(DestNone); + DataNodeRollback(); /* Rollback on GTM if transaction id opened. */ RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 4625261743..7e4771cd71 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -22,7 +22,9 @@ #include <errno.h> #include <stddef.h> #include "c.h" +#include "postgres.h" #include "pgxc/poolcomm.h" +#include "storage/ipc.h" #include "utils/elog.h" #include "miscadmin.h" @@ -408,9 +410,16 @@ pool_flush(PoolPort *port) if (errno != last_reported_send_errno) { last_reported_send_errno = errno; - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not send data to client: %m"))); + + /* + * Handle a seg fault that may later occur in proc array + * when this fails when we are already shutting down + * If shutting down already, do not call. + */ + if (!proc_exit_inprogress) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("could not send data to client: %m"))); } /* diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 6427da391c..dbb8aed3d4 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -93,7 +93,7 @@ static DatabasePool *find_database_pool(const char *database); static DatabasePool *remove_database_pool(const char *database); static int *agent_acquire_connections(PoolAgent *agent, List *nodelist); static DataNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node); -static void agent_release_connections(PoolAgent *agent, bool clean); +static void agent_release_connections(PoolAgent *agent, List *discard); static void release_connection(DatabasePool *dbPool, DataNodePoolSlot *slot, int index, bool clean); static void destroy_slot(DataNodePoolSlot *slot); static void grow_pool(DatabasePool *dbPool, int index); @@ -587,7 +587,7 @@ agent_init(PoolAgent *agent, const char *database, List *nodes) /* disconnect if we still connected */ if (agent->pool) - agent_release_connections(agent, false); + agent_release_connections(agent, NULL); /* find database */ agent->pool = find_database_pool(database); @@ -612,7 +612,7 @@ agent_destroy(PoolAgent *agent) /* Discard connections if any remaining */ if (agent->pool) - agent_release_connections(agent, false); + agent_release_connections(agent, NULL); /* find agent in the list */ for (i = 0; i < agentCount; i++) @@ -700,11 +700,6 @@ static void agent_handle_input(PoolAgent * agent, StringInfo s) { int qtype; - const char *database; - int nodecount; - List *nodelist = NIL; - int *fds; - int i; qtype = pool_getbyte(&agent->port); /* @@ -712,6 +707,12 @@ agent_handle_input(PoolAgent * agent, StringInfo s) */ for (;;) { + const char *database; + int nodecount; + List *nodelist = NIL; + int *fds; + int i; + switch (qtype) { case 'c': /* CONNECT */ @@ -729,9 +730,7 @@ agent_handle_input(PoolAgent * agent, StringInfo s) pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8); nodecount = pq_getmsgint(s, 4); for (i = 0; i < nodecount; i++) - { nodelist = lappend_int(nodelist, pq_getmsgint(s, 4)); - } pq_getmsgend(s); /* * In case of error agent_acquire_connections will log @@ -744,9 +743,13 @@ agent_handle_input(PoolAgent * agent, StringInfo s) pfree(fds); break; case 'r': /* RELEASE CONNECTIONS */ - pool_getmessage(&agent->port, s, 4); + pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8); + nodecount = pq_getmsgint(s, 4); + for (i = 0; i < nodecount; i++) + nodelist = lappend_int(nodelist, pq_getmsgint(s, 4)); pq_getmsgend(s); - agent_release_connections(agent, true); + agent_release_connections(agent, nodelist); + list_free(nodelist); break; default: /* EOF or protocol violation */ agent_destroy(agent); @@ -831,11 +834,24 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist) * Retun connections back to the pool */ void -PoolManagerReleaseConnections(void) +PoolManagerReleaseConnections(int ndisc, int* discard) { + uint32 n32; + uint32 buf[1 + ndisc]; + int i; + Assert(Handle); - pool_putmessage(&Handle->port, 'r', NULL, 0); + n32 = htonl((uint32) ndisc); + buf[0] = n32; + + for (i = 0; i < ndisc;) + { + n32 = htonl((uint32) discard[i++]); + buf[i] = n32; + } + pool_putmessage(&Handle->port, 'r', (char *) buf, + (1 + ndisc) * sizeof(uint32)); pool_flush(&Handle->port); } @@ -844,23 +860,40 @@ PoolManagerReleaseConnections(void) * Release connections */ static void -agent_release_connections(PoolAgent *agent, bool clean) +agent_release_connections(PoolAgent *agent, List *discard) { int i; + DataNodePoolSlot *slot; + if (!agent->connections) return; - /* Enumerate connections */ - for (i = 0; i < NumDataNodes; i++) + if (discard) { - DataNodePoolSlot *slot; + ListCell *lc; + foreach(lc, discard) + { + int node = lfirst_int(lc); + Assert(node > 0 && node <= NumDataNodes); + slot = agent->connections[node - 1]; + + /* Discard connection */ + if (slot) + release_connection(agent->pool, slot, node - 1, false); + agent->connections[node - 1] = NULL; + } + } + + /* Remaining connections are assumed to be clean */ + for (i = 0; i < NumDataNodes; i++) + { slot = agent->connections[i]; /* Release connection */ if (slot) - release_connection(agent->pool, slot, i, clean); + release_connection(agent->pool, slot, i, true); agent->connections[i] = NULL; } } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 24cfe42319..e59c86920e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4393,11 +4393,11 @@ pgxc_transaction_stmt (Node *parsetree) break; case TRANS_STMT_COMMIT: - DataNodeCommit(DestNone); + DataNodeCommit(); break; case TRANS_STMT_ROLLBACK: - DataNodeRollback(DestNone); + DataNodeRollback(); break; default: diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index c506ce064f..e60fdd9e02 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -2875,12 +2875,16 @@ reversedirection_heap(Tuplesortstate *state) static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK) { + DataNodeHandle *conn = state->combiner->connections[tapenum]; for (;;) { - switch (handle_response(state->combiner->connections[tapenum], state->combiner)) + switch (handle_response(conn, state->combiner)) { case RESPONSE_EOF: - data_node_receive(1, state->combiner->connections + tapenum, NULL); + if (data_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg(conn->error))); break; case RESPONSE_COMPLETE: if (eofOK) diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index ab95022b5f..849d84acea 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -50,6 +50,9 @@ struct data_node_handle /* Connection state */ char transaction_status; DNConnectionState state; +#ifdef DN_CONNECTION_DEBUG + bool have_row_desc; +#endif char *error; /* Output buffer */ char *outBuffer; @@ -86,7 +89,7 @@ extern int data_node_send_query(DataNodeHandle * handle, const char *query); extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid); extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); -extern void data_node_receive(const int conn_count, +extern int data_node_receive(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout); extern int data_node_read_data(DataNodeHandle * conn); extern int send_some(DataNodeHandle * handle, int len); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index d99806a01b..e9b59ccbd9 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -62,7 +62,6 @@ typedef struct RemoteQueryState char errorCode[5]; /* error code to send back to client */ char *errorMessage; /* error message to send back to client */ bool query_Done; /* query has been sent down to data nodes */ - bool need_tran; /* auto commit on nodes after completion */ char *completionTag; /* completion tag to present to caller */ char *msg; /* last data row message */ int msglen; /* length of the data row message */ @@ -81,8 +80,8 @@ typedef struct RemoteQueryState /* Multinode Executor */ extern void DataNodeBegin(void); -extern int DataNodeCommit(CommandDest dest); -extern int DataNodeRollback(CommandDest dest); +extern void DataNodeCommit(void); +extern int DataNodeRollback(void); extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index 2c9128e7c7..b7ac3aedf0 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -45,7 +45,7 @@ typedef struct char *connstr; int freeSize; /* available connections */ int size; /* total pool size */ - DataNodePoolSlot **slot; + DataNodePoolSlot **slot; } DataNodePool; /* All pools for specified database */ @@ -57,7 +57,7 @@ typedef struct databasepool struct databasepool *next; } DatabasePool; -/* Agent of client session (Pool Manager side) +/* Agent of client session (Pool Manager side) * Acts as a session manager, grouping connections together */ typedef struct @@ -125,6 +125,6 @@ extern void PoolManagerConnect(PoolHandle *handle, const char *database); extern int *PoolManagerGetConnections(List *nodelist); /* Retun connections back to the pool */ -extern void PoolManagerReleaseConnections(void); +extern void PoolManagerReleaseConnections(int ndisc, int* discard); #endif |
