diff options
| -rw-r--r-- | contrib/pgbench/pgbench.c | 6 | ||||
| -rw-r--r-- | src/backend/access/transam/xact.c | 32 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 49 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 449 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolcomm.c | 15 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 71 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 4 | ||||
| -rw-r--r-- | src/backend/utils/sort/tuplesort.c | 8 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/poolmgr.h | 6 |
11 files changed, 347 insertions, 303 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 321f797c2c..c2aaeec529 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -202,9 +202,9 @@ static char *tpc_b_bid = { "\\setrandom tid 1 :ntellers\n" "\\setrandom delta -5000 5000\n" "BEGIN;\n" - "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid\n" - "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid AND bid = :bid;\n" + "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid\n" + "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n" "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" "END;\n" diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b505dee1f5..4db1e2c864 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -135,7 +135,7 @@ typedef struct TransactionStateData { TransactionId transactionId; /* my XID, or Invalid if none */ #ifdef PGXC /* PGXC_COORD */ - GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ + GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */ #endif SubTransactionId subTransactionId; /* my subxact ID */ char *name; /* savepoint name, if any */ @@ -314,7 +314,7 @@ GetCurrentGlobalTransactionId(void) * GetGlobalTransactionId * * This will return the GXID of the specified transaction, - * getting one from the GTM if it's not yet set. + * getting one from the GTM if it's not yet set. */ static GlobalTransactionId GetGlobalTransactionId(TransactionState s) @@ -469,7 +469,7 @@ AssignTransactionId(TransactionState s) if (IS_PGXC_COORDINATOR) { s->transactionId = (TransactionId) GetGlobalTransactionId(s); - elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s", + elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s", s->transactionId, isSubXact ? "true" : "false"); } else @@ -1679,6 +1679,14 @@ CommitTransaction(void) */ AtEOXact_UpdateFlatFiles(true); +#ifdef PGXC + /* + * There can be error on the data nodes. So go to data nodes before + * changing transaction state and local clean up + */ + DataNodeCommit(); +#endif + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -1694,13 +1702,13 @@ CommitTransaction(void) latestXid = RecordTransactionCommit(); TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); + #ifdef PGXC + /* + * Now we can let GTM know about transaction commit + */ if (IS_PGXC_COORDINATOR) { - /* Make sure this committed on the DataNodes, - * if so it will just return - */ - DataNodeCommit(DestNone); CommitTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } @@ -1712,7 +1720,7 @@ CommitTransaction(void) CommitTranGTM((GlobalTransactionId) latestXid); } #endif - + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ @@ -1808,7 +1816,7 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; -#ifdef PGXC +#ifdef PGXC if (IS_PGXC_COORDINATOR) s->globalTransactionId = InvalidGlobalTransactionId; else if (IS_PGXC_DATANODE) @@ -2142,10 +2150,10 @@ AbortTransaction(void) #ifdef PGXC if (IS_PGXC_COORDINATOR) { - /* Make sure this is rolled back on the DataNodes, - * if so it will just return + /* Make sure this is rolled back on the DataNodes, + * if so it will just return */ - DataNodeRollback(DestNone); + DataNodeRollback(); RollbackTranGTM(s->globalTransactionId); latestXid = s->globalTransactionId; } diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 517b1e4d78..0f4072d01e 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -199,6 +199,9 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum) handle->sock = sock; handle->transaction_status = 'I'; handle->state = DN_CONNECTION_STATE_IDLE; +#ifdef DN_CONNECTION_DEBUG + handle->have_row_desc = false; +#endif handle->error = NULL; handle->outEnd = 0; handle->inStart = 0; @@ -211,7 +214,7 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum) * Wait while at least one of specified connections has data available and read * the data into the buffer */ -void +int data_node_receive(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout) { @@ -239,7 +242,7 @@ data_node_receive(const int conn_count, * Return if we do not have connections to receive input */ if (nfds == 0) - return; + return 0; retry: res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); @@ -249,27 +252,19 @@ retry: if (errno == EINTR || errno == EAGAIN) goto retry; - /* - * PGXCTODO - we may want to close the connections and notify the - * pooler that these are invalid. - */ if (errno == EBADF) { - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() bad file descriptor set"))); + elog(WARNING, "select() bad file descriptor set"); } - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("select() error: %d", errno))); + elog(WARNING, "select() error: %d", errno); + return errno; } if (res_select == 0) { /* Handle timeout */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("timeout while waiting for response"))); + elog(WARNING, "timeout while waiting for response"); + return EOF; } /* read data */ @@ -283,10 +278,9 @@ retry: if (read_status == EOF || read_status < 0) { - /* PGXCTODO - we should notify the pooler to destroy the connections */ - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unexpected EOF on datanode connection"))); + add_error_message(conn, "unexpected EOF on datanode connection"); + elog(WARNING, "unexpected EOF on datanode connection"); + return EOF; } else { @@ -294,6 +288,7 @@ retry: } } } + return 0; } @@ -522,7 +517,7 @@ get_message(DataNodeHandle *conn, int *len, char **msg) * ensure_in_buffer_capacity() will immediately return */ ensure_in_buffer_capacity(5 + (size_t) *len, conn); - conn->state == DN_CONNECTION_STATE_QUERY; + conn->state = DN_CONNECTION_STATE_QUERY; conn->inCursor = conn->inStart; return '\0'; } @@ -539,19 +534,27 @@ void release_handles(void) { int i; + int discard[NumDataNodes]; + int ndisc = 0; if (node_count == 0) return; - PoolManagerReleaseConnections(); for (i = 0; i < NumDataNodes; i++) { DataNodeHandle *handle = &handles[i]; if (handle->sock != NO_SOCKET) + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + elog(WARNING, "Connection to data node %d has unexpected state %d and will be dropped", handle->nodenum, handle->state); + discard[ndisc++] = handle->nodenum; + } data_node_free(handle); + } } - + PoolManagerReleaseConnections(ndisc, discard); node_count = 0; } @@ -897,7 +900,7 @@ void add_error_message(DataNodeHandle *handle, const char *message) { handle->transaction_status = 'E'; - handle->state = DN_CONNECTION_STATE_IDLE; + handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY; if (handle->error) { /* PGXCTODO append */ diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index c6f9042b43..bbedef0f2f 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -24,6 +24,7 @@ #include "miscadmin.h" #include "pgxc/execRemote.h" #include "pgxc/poolmgr.h" +#include "storage/ipc.h" #include "utils/datum.h" #include "utils/memutils.h" #include "utils/tuplesort.h" @@ -40,9 +41,10 @@ static bool autocommit = true; static DataNodeHandle **write_node_list = NULL; static int write_node_count = 0; -static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest); -static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest); +static int data_node_begin(int conn_count, DataNodeHandle ** connections, + GlobalTransactionId gxid); +static int data_node_commit(int conn_count, DataNodeHandle ** connections); +static int data_node_rollback(int conn_count, DataNodeHandle ** connections); static void clear_write_node_list(); @@ -920,7 +922,7 @@ FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot) /* * Handle responses from the Data node connections */ -static void +static int data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout, RemoteQueryState *combiner) { @@ -940,7 +942,8 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, { int i = 0; - data_node_receive(count, to_receive, timeout); + if (data_node_receive(count, to_receive, timeout)) + return EOF; while (i < count) { int result = handle_response(to_receive[i], combiner); @@ -959,12 +962,17 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, break; default: /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type))); + add_error_message(to_receive[i], "Unexpected response from the data nodes"); + elog(WARNING, "Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type); + /* Stop tracking and move last connection in place */ + count--; + if (i < count) + to_receive[i] = to_receive[count]; } } } + + return 0; } /* @@ -990,6 +998,18 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) if (conn->state == DN_CONNECTION_STATE_QUERY) return RESPONSE_EOF; + /* + * If we are in the process of shutting down, we + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ + if (proc_exit_inprogress) + { + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + return RESPONSE_EOF; + } + /* TODO handle other possible responses */ switch (get_message(conn, &msg_len, &msg)) { @@ -1005,10 +1025,17 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) HandleCommandComplete(combiner, msg, msg_len); break; case 'T': /* RowDescription */ +#ifdef DN_CONNECTION_DEBUG + Assert(!conn->have_row_desc); + conn->have_row_desc = true; +#endif if (HandleRowDescription(combiner, msg, msg_len)) return RESPONSE_TUPDESC; break; case 'D': /* DataRow */ +#ifdef DN_CONNECTION_DEBUG + Assert(conn->have_row_desc); +#endif HandleDataRow(combiner, msg, msg_len); return RESPONSE_DATAROW; case 'G': /* CopyInResponse */ @@ -1042,6 +1069,9 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) case 'Z': /* ReadyForQuery */ conn->transaction_status = msg[0]; conn->state = DN_CONNECTION_STATE_IDLE; +#ifdef DN_CONNECTION_DEBUG + conn->have_row_desc = false; +#endif return RESPONSE_COMPLETE; case 'I': /* EmptyQuery */ default: @@ -1058,7 +1088,8 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner) * Send BEGIN command to the Data nodes and receive responses */ static int -data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid) +data_node_begin(int conn_count, DataNodeHandle ** connections, + GlobalTransactionId gxid) { int i; struct timeval *timeout = NULL; @@ -1078,7 +1109,8 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + return EOF; /* Verify status */ return ValidateAndCloseCombiner(combiner) ? 0 : EOF; @@ -1109,12 +1141,12 @@ DataNodeBegin(void) /* - * Commit current transaction, use two-phase commit if necessary + * Commit current transaction on data nodes where it has been started */ -int -DataNodeCommit(CommandDest dest) +void +DataNodeCommit(void) { - int res; + int res = 0; int tran_count; DataNodeHandle *connections[NumDataNodes]; @@ -1128,7 +1160,7 @@ DataNodeCommit(CommandDest dest) if (tran_count == 0) goto finish; - res = data_node_commit(tran_count, connections, dest); + res = data_node_commit(tran_count, connections); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -1138,15 +1170,19 @@ finish: release_handles(); autocommit = true; clear_write_node_list(); - return res; + if (res != 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Could not commit connection on data nodes"))); } /* - * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses + * Commit transaction on specified data node connections, use two-phase commit + * if more then on one node data have been modified during the transactioon. */ static int -data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest) +data_node_commit(int conn_count, DataNodeHandle ** connections) { int i; struct timeval *timeout = NULL; @@ -1191,13 +1227,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + result = EOF; /* Reset combiner */ if (!ValidateAndResetCombiner(combiner)) - { result = EOF; - } } if (!do2PC) @@ -1238,7 +1273,8 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest combiner->dest = None_Receiver; } /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + result = EOF; result = ValidateAndCloseCombiner(combiner) ? result : EOF; finish: @@ -1253,7 +1289,7 @@ finish: * Rollback current transaction */ int -DataNodeRollback(CommandDest dest) +DataNodeRollback(void) { int res = 0; int tran_count; @@ -1269,7 +1305,7 @@ DataNodeRollback(CommandDest dest) if (tran_count == 0) goto finish; - res = data_node_rollback(tran_count, connections, dest); + res = data_node_rollback(tran_count, connections); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -1287,24 +1323,23 @@ finish: * Send ROLLBACK command down to the Data nodes and handle responses */ static int -data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest) +data_node_rollback(int conn_count, DataNodeHandle ** connections) { int i; struct timeval *timeout = NULL; - int result = 0; RemoteQueryState *combiner; /* Send ROLLBACK - */ for (i = 0; i < conn_count; i++) { - if (data_node_send_query(connections[i], "ROLLBACK")) - result = EOF; + data_node_send_query(connections[i], "ROLLBACK"); } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); + if (data_node_receive_responses(conn_count, connections, timeout, combiner)) + return EOF; /* Verify status */ return ValidateAndCloseCombiner(combiner) ? 0 : EOF; @@ -1404,7 +1439,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ if (new_count > 0 && need_tran) { /* Start transaction on connections where it is not started */ - if (data_node_begin(new_count, newConnections, DestNone, gxid)) + if (data_node_begin(new_count, newConnections, gxid)) { pfree(connections); pfree(copy_connections); @@ -1448,8 +1483,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ combiner->dest = None_Receiver; /* Receive responses */ - data_node_receive_responses(conn_count, connections, timeout, combiner); - if (!ValidateAndCloseCombiner(combiner)) + if (data_node_receive_responses(conn_count, connections, timeout, combiner) + || !ValidateAndCloseCombiner(combiner)) { if (autocommit) { @@ -1665,6 +1700,12 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE* ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on datanode connection"))); + else + /* + * Set proper connection status - handle_response + * has changed it to DN_CONNECTION_STATE_QUERY + */ + handle->state = DN_CONNECTION_STATE_COPY_OUT; } /* There is no more data that can be read from connection */ } @@ -1746,7 +1787,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, combiner = CreateResponseCombiner(conn_count + 1, combine_type); combiner->dest = None_Receiver; - data_node_receive_responses(1, &primary_handle, timeout, combiner); + error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error; } for (i = 0; i < conn_count; i++) @@ -1786,30 +1827,14 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, combiner = CreateResponseCombiner(conn_count, combine_type); combiner->dest = None_Receiver; } - data_node_receive_responses(conn_count, connections, timeout, combiner); + error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error; processed = combiner->row_count; if (!ValidateAndCloseCombiner(combiner) || error) - { - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else - if (!PersistentConnections) release_handles(); - } - - return 0; - } - - if (autocommit) - { - if (need_tran) - DataNodeCommit(DestNone); - else - if (!PersistentConnections) release_handles(); - } + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Error while running COPY"))); return processed; } @@ -1882,9 +1907,17 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst) else { int i; + + /* + * Data node may be sending junk columns which are always at the end, + * but it must not be shorter then result slot. + */ + Assert(dst->tts_tupleDescriptor->natts <= src->tts_tupleDescriptor->natts); ExecClearTuple(dst); slot_getallattrs(src); - /* PGXCTODO revisit: probably incorrect */ + /* + * PGXCTODO revisit: if it is correct to copy Datums using assignment? + */ for (i = 0; i < dst->tts_tupleDescriptor->natts; i++) { dst->tts_values[i] = src->tts_values[i]; @@ -1911,6 +1944,8 @@ ExecRemoteQuery(RemoteQueryState *node) EState *estate = node->ss.ps.state; TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot; TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot; + bool have_tuple = false; + if (!node->query_Done) { @@ -1974,7 +2009,7 @@ ExecRemoteQuery(RemoteQueryState *node) else need_tran = !autocommit || total_conn_count > 1; - elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); + elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false"); stat_statement(); if (autocommit) @@ -2052,7 +2087,7 @@ ExecRemoteQuery(RemoteQueryState *node) new_connections[new_count++] = connections[i]; if (new_count) - data_node_begin(new_count, new_connections, DestNone, gxid); + data_node_begin(new_count, new_connections, gxid); } /* See if we have a primary nodes, execute on it first before the others */ @@ -2088,36 +2123,22 @@ ExecRemoteQuery(RemoteQueryState *node) while (node->command_complete_count < 1) { - PG_TRY(); - { - data_node_receive(1, primaryconnection, NULL); - while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) - data_node_receive(1, primaryconnection, NULL); - if (node->errorMessage) - { - char *code = node->errorCode; + if (data_node_receive(1, primaryconnection, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + while (handle_response(primaryconnection[0], node) == RESPONSE_EOF) + if (data_node_receive(1, primaryconnection, NULL)) ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); - } - } - /* If we got an error response return immediately */ - PG_CATCH(); + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + if (node->errorMessage) { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - - pfree(primaryconnection); - pfree(connections); - PG_RE_THROW(); + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } - PG_END_TRY(); } pfree(primaryconnection); } @@ -2148,8 +2169,6 @@ ExecRemoteQuery(RemoteQueryState *node) } } - PG_TRY(); - { /* * Stop if all commands are completed or we got a data row and * initialized state node for subsequent invocations @@ -2158,7 +2177,10 @@ ExecRemoteQuery(RemoteQueryState *node) { int i = 0; - data_node_receive(regular_conn_count, connections, NULL); + if (data_node_receive(regular_conn_count, connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); /* * Handle input from the data nodes. * If we got a RESPONSE_DATAROW we can break handling to wrap @@ -2234,185 +2256,148 @@ ExecRemoteQuery(RemoteQueryState *node) } } } - } - /* If we got an error response return immediately */ - PG_CATCH(); - { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - PG_RE_THROW(); - } - PG_END_TRY(); + node->query_Done = true; - node->need_tran = need_tran; } - PG_TRY(); + if (node->tuplesortstate) { - bool have_tuple = false; - - if (node->tuplesortstate) + while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, + true, scanslot)) { - while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate, - true, scanslot)) + have_tuple = true; + /* + * If DISTINCT is specified and current tuple matches to + * previous skip it and get next one. + * Othervise return current tuple + */ + if (step->distinct) { - have_tuple = true; /* - * If DISTINCT is specified and current tuple matches to - * previous skip it and get next one. - * Othervise return current tuple + * Always receive very first tuple and + * skip to next if scan slot match to previous (result slot) */ - if (step->distinct) + if (!TupIsNull(resultslot) && + execTuplesMatch(scanslot, + resultslot, + step->distinct->numCols, + step->distinct->uniqColIdx, + node->eqfunctions, + node->tmp_ctx)) { - /* - * Always receive very first tuple and - * skip to next if scan slot match to previous (result slot) - */ - if (!TupIsNull(resultslot) && - execTuplesMatch(scanslot, - resultslot, - step->distinct->numCols, - step->distinct->uniqColIdx, - node->eqfunctions, - node->tmp_ctx)) - { - have_tuple = false; - continue; - } + have_tuple = false; + continue; } - copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); - break; } - if (!have_tuple) - ExecClearTuple(resultslot); + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + break; } - else + if (!have_tuple) + ExecClearTuple(resultslot); + } + else + { + while (node->conn_count > 0 && !have_tuple) { - while (node->conn_count > 0 && !have_tuple) - { - int i; + int i; - /* - * If combiner already has tuple go ahead and return it - * otherwise tuple will be cleared - */ - if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + /* + * If combiner already has tuple go ahead and return it + * otherwise tuple will be cleared + */ + if (FetchTuple(node, scanslot) && !TupIsNull(scanslot)) + { + if (node->simple_aggregates) { - if (node->simple_aggregates) - { - /* - * Advance aggregate functions and allow to read up next - * data row message and get tuple in the same slot on - * next iteration - */ - exec_simple_aggregates(node, scanslot); - } - else - { - /* - * Receive current slot and read up next data row - * message before exiting the loop. Next time when this - * function is invoked we will have either data row - * message ready or EOF - */ - copy_slot(node, scanslot, resultslot); - (*node->dest->receiveSlot) (resultslot, node->dest); - have_tuple = true; - } + /* + * Advance aggregate functions and allow to read up next + * data row message and get tuple in the same slot on + * next iteration + */ + exec_simple_aggregates(node, scanslot); } - - /* - * Handle input to get next row or ensure command is completed, - * starting from connection next after current. If connection - * does not - */ - if ((i = node->current_conn + 1) == node->conn_count) - i = 0; - - for (;;) + else { - int res = handle_response(node->connections[i], node); - if (res == RESPONSE_EOF) - { - /* go to next connection */ - if (++i == node->conn_count) - i = 0; - /* if we cycled over all connections we need to receive more */ - if (i == node->current_conn) - data_node_receive(node->conn_count, node->connections, NULL); - } - else if (res == RESPONSE_COMPLETE) - { - if (--node->conn_count == 0) - break; - if (i == node->conn_count) - i = 0; - else - node->connections[i] = node->connections[node->conn_count]; - if (node->current_conn == node->conn_count) - node->current_conn = i; - } - else if (res == RESPONSE_DATAROW) - { - node->current_conn = i; - break; - } + /* + * Receive current slot and read up next data row + * message before exiting the loop. Next time when this + * function is invoked we will have either data row + * message ready or EOF + */ + copy_slot(node, scanslot, resultslot); + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; } } /* - * We may need to finalize aggregates + * Handle input to get next row or ensure command is completed, + * starting from connection next after current. If connection + * does not */ - if (!have_tuple && node->simple_aggregates) + if ((i = node->current_conn + 1) == node->conn_count) + i = 0; + + for (;;) { - finish_simple_aggregates(node, resultslot); - if (!TupIsNull(resultslot)) + int res = handle_response(node->connections[i], node); + if (res == RESPONSE_EOF) { - (*node->dest->receiveSlot) (resultslot, node->dest); - have_tuple = true; + /* go to next connection */ + if (++i == node->conn_count) + i = 0; + /* if we cycled over all connections we need to receive more */ + if (i == node->current_conn) + if (data_node_receive(node->conn_count, node->connections, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to read response from data nodes"))); + } + else if (res == RESPONSE_COMPLETE) + { + if (--node->conn_count == 0) + break; + if (i == node->conn_count) + i = 0; + else + node->connections[i] = node->connections[node->conn_count]; + if (node->current_conn == node->conn_count) + node->current_conn = i; + } + else if (res == RESPONSE_DATAROW) + { + node->current_conn = i; + break; } } - - if (!have_tuple) /* report end of scan */ - ExecClearTuple(resultslot); - } - if (node->errorMessage) + /* + * We may need to finalize aggregates + */ + if (!have_tuple && node->simple_aggregates) { - char *code = node->errorCode; - ereport(ERROR, - (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), - errmsg("%s", node->errorMessage))); + finish_simple_aggregates(node, resultslot); + if (!TupIsNull(resultslot)) + { + (*node->dest->receiveSlot) (resultslot, node->dest); + have_tuple = true; + } } - /* - * If command is completed we should commit work. - */ - if (node->conn_count == 0 && autocommit && node->need_tran) - DataNodeCommit(DestNone); + if (!have_tuple) /* report end of scan */ + ExecClearTuple(resultslot); + } - /* If we got an error response return immediately */ - PG_CATCH(); + + if (node->errorMessage) { - /* We are going to exit, so release combiner */ - if (autocommit) - { - if (node->need_tran) - DataNodeRollback(DestNone); - else if (!PersistentConnections) - release_handles(); - } - PG_RE_THROW(); + char *code = node->errorCode; + ereport(ERROR, + (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])), + errmsg("%s", node->errorMessage))); } - PG_END_TRY(); return resultslot; } @@ -2436,7 +2421,7 @@ DataNodeCleanAndRelease(int code, Datum arg) /* Rollback on Data Nodes */ if (IsTransactionState()) { - DataNodeRollback(DestNone); + DataNodeRollback(); /* Rollback on GTM if transaction id opened. */ RollbackTranGTM((GlobalTransactionId) GetCurrentTransactionIdIfAny()); diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 4625261743..7e4771cd71 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -22,7 +22,9 @@ #include <errno.h> #include <stddef.h> #include "c.h" +#include "postgres.h" #include "pgxc/poolcomm.h" +#include "storage/ipc.h" #include "utils/elog.h" #include "miscadmin.h" @@ -408,9 +410,16 @@ pool_flush(PoolPort *port) if (errno != last_reported_send_errno) { last_reported_send_errno = errno; - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not send data to client: %m"))); + + /* + * Handle a seg fault that may later occur in proc array + * when this fails when we are already shutting down + * If shutting down already, do not call. + */ + if (!proc_exit_inprogress) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("could not send data to client: %m"))); } /* diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 6427da391c..dbb8aed3d4 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -93,7 +93,7 @@ static DatabasePool *find_database_pool(const char *database); static DatabasePool *remove_database_pool(const char *database); static int *agent_acquire_connections(PoolAgent *agent, List *nodelist); static DataNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node); -static void agent_release_connections(PoolAgent *agent, bool clean); +static void agent_release_connections(PoolAgent *agent, List *discard); static void release_connection(DatabasePool *dbPool, DataNodePoolSlot *slot, int index, bool clean); static void destroy_slot(DataNodePoolSlot *slot); static void grow_pool(DatabasePool *dbPool, int index); @@ -587,7 +587,7 @@ agent_init(PoolAgent *agent, const char *database, List *nodes) /* disconnect if we still connected */ if (agent->pool) - agent_release_connections(agent, false); + agent_release_connections(agent, NULL); /* find database */ agent->pool = find_database_pool(database); @@ -612,7 +612,7 @@ agent_destroy(PoolAgent *agent) /* Discard connections if any remaining */ if (agent->pool) - agent_release_connections(agent, false); + agent_release_connections(agent, NULL); /* find agent in the list */ for (i = 0; i < agentCount; i++) @@ -700,11 +700,6 @@ static void agent_handle_input(PoolAgent * agent, StringInfo s) { int qtype; - const char *database; - int nodecount; - List *nodelist = NIL; - int *fds; - int i; qtype = pool_getbyte(&agent->port); /* @@ -712,6 +707,12 @@ agent_handle_input(PoolAgent * agent, StringInfo s) */ for (;;) { + const char *database; + int nodecount; + List *nodelist = NIL; + int *fds; + int i; + switch (qtype) { case 'c': /* CONNECT */ @@ -729,9 +730,7 @@ agent_handle_input(PoolAgent * agent, StringInfo s) pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8); nodecount = pq_getmsgint(s, 4); for (i = 0; i < nodecount; i++) - { nodelist = lappend_int(nodelist, pq_getmsgint(s, 4)); - } pq_getmsgend(s); /* * In case of error agent_acquire_connections will log @@ -744,9 +743,13 @@ agent_handle_input(PoolAgent * agent, StringInfo s) pfree(fds); break; case 'r': /* RELEASE CONNECTIONS */ - pool_getmessage(&agent->port, s, 4); + pool_getmessage(&agent->port, s, 4 * NumDataNodes + 8); + nodecount = pq_getmsgint(s, 4); + for (i = 0; i < nodecount; i++) + nodelist = lappend_int(nodelist, pq_getmsgint(s, 4)); pq_getmsgend(s); - agent_release_connections(agent, true); + agent_release_connections(agent, nodelist); + list_free(nodelist); break; default: /* EOF or protocol violation */ agent_destroy(agent); @@ -831,11 +834,24 @@ agent_acquire_connections(PoolAgent *agent, List *nodelist) * Retun connections back to the pool */ void -PoolManagerReleaseConnections(void) +PoolManagerReleaseConnections(int ndisc, int* discard) { + uint32 n32; + uint32 buf[1 + ndisc]; + int i; + Assert(Handle); - pool_putmessage(&Handle->port, 'r', NULL, 0); + n32 = htonl((uint32) ndisc); + buf[0] = n32; + + for (i = 0; i < ndisc;) + { + n32 = htonl((uint32) discard[i++]); + buf[i] = n32; + } + pool_putmessage(&Handle->port, 'r', (char *) buf, + (1 + ndisc) * sizeof(uint32)); pool_flush(&Handle->port); } @@ -844,23 +860,40 @@ PoolManagerReleaseConnections(void) * Release connections */ static void -agent_release_connections(PoolAgent *agent, bool clean) +agent_release_connections(PoolAgent *agent, List *discard) { int i; + DataNodePoolSlot *slot; + if (!agent->connections) return; - /* Enumerate connections */ - for (i = 0; i < NumDataNodes; i++) + if (discard) { - DataNodePoolSlot *slot; + ListCell *lc; + foreach(lc, discard) + { + int node = lfirst_int(lc); + Assert(node > 0 && node <= NumDataNodes); + slot = agent->connections[node - 1]; + + /* Discard connection */ + if (slot) + release_connection(agent->pool, slot, node - 1, false); + agent->connections[node - 1] = NULL; + } + } + + /* Remaining connections are assumed to be clean */ + for (i = 0; i < NumDataNodes; i++) + { slot = agent->connections[i]; /* Release connection */ if (slot) - release_connection(agent->pool, slot, i, clean); + release_connection(agent->pool, slot, i, true); agent->connections[i] = NULL; } } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 24cfe42319..e59c86920e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4393,11 +4393,11 @@ pgxc_transaction_stmt (Node *parsetree) break; case TRANS_STMT_COMMIT: - DataNodeCommit(DestNone); + DataNodeCommit(); break; case TRANS_STMT_ROLLBACK: - DataNodeRollback(DestNone); + DataNodeRollback(); break; default: diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index c506ce064f..e60fdd9e02 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -2875,12 +2875,16 @@ reversedirection_heap(Tuplesortstate *state) static unsigned int getlen_datanode(Tuplesortstate *state, int tapenum, bool eofOK) { + DataNodeHandle *conn = state->combiner->connections[tapenum]; for (;;) { - switch (handle_response(state->combiner->connections[tapenum], state->combiner)) + switch (handle_response(conn, state->combiner)) { case RESPONSE_EOF: - data_node_receive(1, state->combiner->connections + tapenum, NULL); + if (data_node_receive(1, &conn, NULL)) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg(conn->error))); break; case RESPONSE_COMPLETE: if (eofOK) diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index ab95022b5f..849d84acea 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -50,6 +50,9 @@ struct data_node_handle /* Connection state */ char transaction_status; DNConnectionState state; +#ifdef DN_CONNECTION_DEBUG + bool have_row_desc; +#endif char *error; /* Output buffer */ char *outBuffer; @@ -86,7 +89,7 @@ extern int data_node_send_query(DataNodeHandle * handle, const char *query); extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid); extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); -extern void data_node_receive(const int conn_count, +extern int data_node_receive(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout); extern int data_node_read_data(DataNodeHandle * conn); extern int send_some(DataNodeHandle * handle, int len); diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index d99806a01b..e9b59ccbd9 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -62,7 +62,6 @@ typedef struct RemoteQueryState char errorCode[5]; /* error code to send back to client */ char *errorMessage; /* error message to send back to client */ bool query_Done; /* query has been sent down to data nodes */ - bool need_tran; /* auto commit on nodes after completion */ char *completionTag; /* completion tag to present to caller */ char *msg; /* last data row message */ int msglen; /* length of the data row message */ @@ -81,8 +80,8 @@ typedef struct RemoteQueryState /* Multinode Executor */ extern void DataNodeBegin(void); -extern int DataNodeCommit(CommandDest dest); -extern int DataNodeRollback(CommandDest dest); +extern void DataNodeCommit(void); +extern int DataNodeRollback(void); extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index 2c9128e7c7..b7ac3aedf0 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -45,7 +45,7 @@ typedef struct char *connstr; int freeSize; /* available connections */ int size; /* total pool size */ - DataNodePoolSlot **slot; + DataNodePoolSlot **slot; } DataNodePool; /* All pools for specified database */ @@ -57,7 +57,7 @@ typedef struct databasepool struct databasepool *next; } DatabasePool; -/* Agent of client session (Pool Manager side) +/* Agent of client session (Pool Manager side) * Acts as a session manager, grouping connections together */ typedef struct @@ -125,6 +125,6 @@ extern void PoolManagerConnect(PoolHandle *handle, const char *database); extern int *PoolManagerGetConnections(List *nodelist); /* Retun connections back to the pool */ -extern void PoolManagerReleaseConnections(void); +extern void PoolManagerReleaseConnections(int ndisc, int* discard); #endif |
