diff options
author | Abbas | 2011-06-24 17:59:57 +0000 |
---|---|---|
committer | Abbas | 2011-06-24 17:59:57 +0000 |
commit | ff7be6e332b36fc7aad99876bf107e258264a7f1 (patch) | |
tree | e72854c2ad18eade7812ab2146c54512e0a31617 /src | |
parent | d56caa5e2ac517b83595586987794337c9dea357 (diff) |
This patch adds a system in XC to cancel a running query, and flush network buffers of any results data nodes might have sent before cancelling the query.
This was required to fix certain issues where coordinator encounters an error while processing rows from data nodes and quits row processing.
It then issues a new query and finds an old row description in the network buffer. This can and was crashing the server.
To cancel a query a new pooler command 'h' is added.
This command is sent to the pooler by the coordinator and the pooler issues PQcancel to the respective data nodes.
Cancel request is sent every time coordinator raises an error of level more than ERROR.
This commit fixes bug 3306801
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/common/heaptuple.c | 7 | ||||
-rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 159 | ||||
-rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 218 | ||||
-rw-r--r-- | src/backend/pgxc/pool/poolcomm.c | 4 | ||||
-rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 154 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 18 | ||||
-rw-r--r-- | src/include/access/xact.h | 1 | ||||
-rw-r--r-- | src/include/pgxc/execRemote.h | 7 | ||||
-rw-r--r-- | src/include/pgxc/pgxcnode.h | 11 | ||||
-rw-r--r-- | src/include/pgxc/poolmgr.h | 4 | ||||
-rw-r--r-- | src/test/regress/expected/domain_1.out | 9 | ||||
-rw-r--r-- | src/test/regress/expected/xc_distkey.out | 52 | ||||
-rw-r--r-- | src/test/regress/serial_schedule | 2 |
13 files changed, 513 insertions, 133 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index c0103b811e..d34f00261a 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -1130,7 +1130,7 @@ slot_deform_tuple(TupleTableSlot *slot, int natts) static void slot_deform_datarow(TupleTableSlot *slot) { - int attnum = slot->tts_tupleDescriptor->natts; + int attnum; int i; int col_count; char *cur = slot->tts_dataRow; @@ -1138,6 +1138,11 @@ slot_deform_datarow(TupleTableSlot *slot) uint16 n16; uint32 n32; + if (slot->tts_tupleDescriptor == NULL || slot->tts_dataRow == NULL) + return; + + attnum = slot->tts_tupleDescriptor->natts; + /* fastpath: exit if values already extracted */ if (slot->tts_nvalid == attnum) return; diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index ea304533aa..2f77f5e014 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1195,8 +1195,8 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections, int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) { - char *msg; - int msg_len; + char *msg; + int msg_len; char msg_type; bool suspended = false; @@ -1327,6 +1327,64 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* + * Has the data node sent Ready For Query + */ + +bool +is_data_node_ready(PGXCNodeHandle * conn) +{ + char *msg; + int msg_len; + char msg_type; + bool suspended = false; + + for (;;) + { + /* + * If we are in the process of shutting down, we + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ + if (proc_exit_inprogress) + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + + /* don't read from from the connection if there is a fatal error */ + if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL) + return true; + + /* No data available, exit */ + if (!HAS_MESSAGE_BUFFERED(conn)) + return false; + + msg_type = get_message(conn, &msg_len, &msg); + switch (msg_type) + { + case 's': /* PortalSuspended */ + suspended = true; + break; + + case 'Z': /* ReadyForQuery */ + { + /* + * Return result depends on previous connection state. + * If it was PORTAL_SUSPENDED coordinator want to send down + * another EXECUTE to fetch more rows, otherwise it is done + * with the connection + */ + int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE; + conn->transaction_status = msg[0]; + conn->state = DN_CONNECTION_STATE_IDLE; + conn->combiner = NULL; + return true; + } + } + } + /* never happen, but keep compiler quiet */ + return false; +} + +/* * Send BEGIN command to the Datanodes or Coordinators and receive responses */ static int @@ -2453,7 +2511,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** if (bytes_needed > COPY_BUFFER_SIZE) { /* First look if data node has sent a error message */ - int read_status = pgxc_node_read_data(primary_handle); + int read_status = pgxc_node_read_data(primary_handle, true); if (read_status == EOF || read_status < 0) { add_error_message(primary_handle, "failed to read data from data node"); @@ -2514,7 +2572,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** int to_send = handle->outEnd; /* First look if data node has sent a error message */ - int read_status = pgxc_node_read_data(handle); + int read_status = pgxc_node_read_data(handle, true); if (read_status == EOF || read_status < 0) { add_error_message(handle, "failed to read data from data node"); @@ -2615,7 +2673,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* if (handle_response(handle,combiner) == RESPONSE_EOF) { /* read some extra-data */ - read_status = pgxc_node_read_data(handle); + read_status = pgxc_node_read_data(handle, true); if (read_status < 0) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -2679,30 +2737,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, if (primary_handle) { + error = true; if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (pgxc_node_flush(primary_handle) < 0) - { - error = true; - } - } - else - { - error = true; - } + error = DataNodeCopyEnd(primary_handle, false); combiner = CreateResponseCombiner(conn_count + 1, combine_type); error = (pgxc_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error; @@ -2712,30 +2749,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, { PGXCNodeHandle *handle = connections[i]; + error = true; if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - handle->outBuffer[handle->outEnd++] = 'c'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (pgxc_node_flush(handle) < 0) - { - error = true; - } - } - else - { - error = true; - } + error = DataNodeCopyEnd(handle, false); } need_tran = !autocommit || primary_handle || conn_count > 1; @@ -2750,6 +2766,36 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, errmsg("Error while running COPY"))); } +/* + * End copy process on a connection + */ +bool +DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error) +{ + int nLen = htonl(4); + + if (handle == NULL) + return true; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) + return true; + + if (is_error) + handle->outBuffer[handle->outEnd++] = 'f'; + else + handle->outBuffer[handle->outEnd++] = 'c'; + + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (pgxc_node_flush(handle) < 0) + return true; + + return false; +} + RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { @@ -3296,7 +3342,9 @@ do_query(RemoteQueryState *node) while (true) { int res; - pgxc_node_receive(1, &primaryconnection, NULL); + if (pgxc_node_receive(1, &primaryconnection, NULL)) + break; + res = handle_response(primaryconnection, node); if (res == RESPONSE_COMPLETE) break; @@ -4248,7 +4296,8 @@ ExecRemoteUtility(RemoteQuery *node) { int i = 0; - pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL); + if (pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL)) + break; /* * Handle input from the data nodes. * We do not expect data nodes returning tuples when running utility @@ -4296,7 +4345,9 @@ ExecRemoteUtility(RemoteQuery *node) { int i = 0; - pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL); + if (pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL)) + break; + while (i < co_conn_count) { int res = handle_response(pgxc_connections->coord_handles[i], remotestate); diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index a0b8da48d8..a2e90cecd1 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -20,6 +20,7 @@ #include <sys/select.h> #include <sys/time.h> #include <sys/types.h> +#include <sys/ioctl.h> #include <stdlib.h> #include <string.h> #include <unistd.h> @@ -279,21 +280,35 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum) * Wait while at least one of specified connections has data available and read * the data into the buffer */ -int +bool pgxc_node_receive(const int conn_count, PGXCNodeHandle ** connections, struct timeval * timeout) { +#define ERROR_OCCURED true +#define NO_ERROR_OCCURED false int i, res_select, nfds = 0; - fd_set readfds; + fd_set readfds; + bool is_msg_buffered; FD_ZERO(&readfds); + + is_msg_buffered = false; + for (i = 0; i < conn_count; i++) + { + /* If connection has a buffered message */ + if (HAS_MESSAGE_BUFFERED(connections[i])) + { + is_msg_buffered = true; + break; + } + } + for (i = 0; i < conn_count; i++) { /* If connection finished sending do not wait input from it */ - if (connections[i]->state == DN_CONNECTION_STATE_IDLE - || HAS_MESSAGE_BUFFERED(connections[i])) + if (connections[i]->state == DN_CONNECTION_STATE_IDLE || HAS_MESSAGE_BUFFERED(connections[i])) continue; /* prepare select params */ @@ -313,7 +328,11 @@ pgxc_node_receive(const int conn_count, * Return if we do not have connections to receive input */ if (nfds == 0) - return 0; + { + if (is_msg_buffered) + return NO_ERROR_OCCURED; + return ERROR_OCCURED; + } retry: res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); @@ -328,14 +347,16 @@ retry: elog(WARNING, "select() bad file descriptor set"); } elog(WARNING, "select() error: %d", errno); - return errno; + if (errno) + return ERROR_OCCURED; + return NO_ERROR_OCCURED; } if (res_select == 0) { /* Handle timeout */ elog(WARNING, "timeout while waiting for response"); - return EOF; + return ERROR_OCCURED; } /* read data */ @@ -345,7 +366,7 @@ retry: if (FD_ISSET(conn->sock, &readfds)) { - int read_status = pgxc_node_read_data(conn); + int read_status = pgxc_node_read_data(conn, true); if (read_status == EOF || read_status < 0) { @@ -354,26 +375,46 @@ retry: add_error_message(conn, "unexpected EOF on datanode connection"); elog(WARNING, "unexpected EOF on datanode connection"); /* Should we read from the other connections before returning? */ - return EOF; + return ERROR_OCCURED; } } } - return 0; + return NO_ERROR_OCCURED; } +/* + * Is there any data enqueued in the TCP input buffer waiting + * to be read sent by the PGXC node connection + */ + +int +pgxc_node_is_data_enqueued(PGXCNodeHandle *conn) +{ + int ret; + int enqueued; + + if (conn->sock < 0) + return 0; + ret = ioctl(conn->sock, FIONREAD, &enqueued); + if (ret != 0) + return 0; + + return enqueued; +} /* * Read up incoming messages from the PGXC node connection */ int -pgxc_node_read_data(PGXCNodeHandle *conn) +pgxc_node_read_data(PGXCNodeHandle *conn, bool close_if_error) { int someread = 0; int nread; if (conn->sock < 0) { - add_error_message(conn, "bad socket"); + if (close_if_error) + add_error_message(conn, "bad socket"); return EOF; } @@ -412,7 +453,8 @@ pgxc_node_read_data(PGXCNodeHandle *conn) */ if (conn->inSize - conn->inEnd < 100) { - add_error_message(conn, "can not allocate buffer"); + if (close_if_error) + add_error_message(conn, "can not allocate buffer"); return -1; } } @@ -424,7 +466,8 @@ retry: if (nread < 0) { - elog(DEBUG1, "dnrd errno = %d", errno); + if (close_if_error) + elog(DEBUG1, "dnrd errno = %d", errno); if (errno == EINTR) goto retry; /* Some systems return EAGAIN/EWOULDBLOCK for no data */ @@ -444,19 +487,22 @@ retry: * OK, we are getting a zero read even though select() says ready. This * means the connection has been closed. Cope. */ - add_error_message(conn, - "data node closed the connection unexpectedly\n" - "\tThis probably means the data node terminated abnormally\n" - "\tbefore or while processing the request.\n"); - conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to - * backend */ - closesocket(conn->sock); - conn->sock = NO_SOCKET; - + if (close_if_error) + { + add_error_message(conn, + "data node closed the connection unexpectedly\n" + "\tThis probably means the data node terminated abnormally\n" + "\tbefore or while processing the request.\n"); + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to + * backend */ + closesocket(conn->sock); + conn->sock = NO_SOCKET; + } return -1; } #endif - add_error_message(conn, "could not receive data from server"); + if (close_if_error) + add_error_message(conn, "could not receive data from server"); return -1; } @@ -488,7 +534,8 @@ retry: if (nread == 0) { - elog(DEBUG1, "nread returned 0"); + if (close_if_error) + elog(DEBUG1, "nread returned 0"); return EOF; } @@ -661,6 +708,102 @@ release_handles(void) coord_count = 0; } +/* + * cancel a running query due to error while processing rows + */ +void +cancel_query(void) +{ + int i; + int dn_cancel[NumDataNodes]; + int co_cancel[NumCoords]; + int dn_count = 0; + int co_count = 0; + + if (datanode_count == 0 && coord_count == 0) + return; + + /* Collect Data Nodes handles */ + for (i = 0; i < NumDataNodes; i++) + { + PGXCNodeHandle *handle = &dn_handles[i]; + + if (handle->sock != NO_SOCKET) + { + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + DataNodeCopyEnd(handle, true); + } + else + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + dn_cancel[dn_count++] = handle->nodenum; + } + } + } + } + + /* Collect Coordinator handles */ + for (i = 0; i < NumCoords; i++) + { + PGXCNodeHandle *handle = &co_handles[i]; + + if (handle->sock != NO_SOCKET) + { + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + DataNodeCopyEnd(handle, true); + } + else + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + co_cancel[dn_count++] = handle->nodenum; + } + } + } + } + + PoolManagerCancelQuery(dn_count, dn_cancel, co_count, co_cancel); +} + +/* + * This method won't return until all network buffers are empty + * To ensure all data in all network buffers is read and wasted + */ +void +clear_all_data(void) +{ + int i; + + if (datanode_count == 0 && coord_count == 0) + return; + + /* Collect Data Nodes handles */ + for (i = 0; i < NumDataNodes; i++) + { + PGXCNodeHandle *handle = &dn_handles[i]; + + if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE) + { + pgxc_node_flush_read(handle); + handle->state = DN_CONNECTION_STATE_IDLE; + } + } + + /* Collect Coordinator handles */ + for (i = 0; i < NumCoords; i++) + { + PGXCNodeHandle *handle = &co_handles[i]; + + if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE) + { + pgxc_node_flush_read(handle); + handle->state = DN_CONNECTION_STATE_IDLE; + } + } +} /* * Ensure specified amount of data can fit to the incoming buffer and @@ -1224,6 +1367,31 @@ pgxc_node_flush(PGXCNodeHandle *handle) } /* + * This method won't return until network buffer is empty or error occurs + * To ensure all data in network buffers is read and wasted + */ +void +pgxc_node_flush_read(PGXCNodeHandle *handle) +{ + bool is_ready; + int read_result; + + if (handle == NULL) + return; + + while(true) + { + is_ready = is_data_node_ready(handle); + if (is_ready == true) + break; + + read_result = pgxc_node_read_data(handle, false); + if (read_result < 0) + break; + } +} + +/* * Send specified statement down to the PGXC node */ int diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 79a377660a..22dc813e45 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -435,9 +435,7 @@ pool_flush(PoolPort *port) * If shutting down already, do not call. */ if (!proc_exit_inprogress) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not send data to client: %m"))); + return 0; } /* diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 1b2c4bfcf2..463bd5a170 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -104,6 +104,7 @@ static DatabasePool *find_database_pool_to_clean(const char *database, List *co_list); static DatabasePool *remove_database_pool(const char *database, const char *user_name); static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist); +static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist); static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type); static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard); static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list); @@ -878,17 +879,17 @@ agent_handle_input(PoolAgent * agent, StringInfo s) */ for (;;) { - const char *database = NULL; - const char *user_name = NULL; - const char *set_command; + const char *database = NULL; + const char *user_name = NULL; + const char *set_command; bool is_local; - int datanodecount; - int coordcount; - List *datanodelist = NIL; - List *coordlist = NIL; - int *fds; - int *pids; - int i, len, res; + int datanodecount; + int coordcount; + List *datanodelist = NIL; + List *coordlist = NIL; + int *fds; + int *pids; + int i, len, res; /* * During a pool cleaning, Abort, Connect and Get Connections messages @@ -1001,6 +1002,32 @@ agent_handle_input(PoolAgent * agent, StringInfo s) if (fds) pfree(fds); break; + + case 'h': /* Cancel SQL Command in progress on specified connections */ + /* + * Length of message is caused by: + * - Message header = 4bytes + * - List of datanodes = NumDataNodes * 4bytes (max) + * - List of coordinators = NumCoords * 4bytes (max) + * - Number of Datanodes sent = 4bytes + * - Number of Coordinators sent = 4bytes + */ + pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12); + datanodecount = pq_getmsgint(s, 4); + for (i = 0; i < datanodecount; i++) + datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4)); + coordcount = pq_getmsgint(s, 4); + /* It is possible that no Coordinators are involved in the transaction */ + for (i = 0; i < coordcount; i++) + coordlist = lappend_int(coordlist, pq_getmsgint(s, 4)); + pq_getmsgend(s); + + cancel_query_on_connections(agent, datanodelist, coordlist); + list_free(datanodelist); + list_free(coordlist); + + break; + case 'r': /* RELEASE CONNECTIONS */ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12); datanodecount = pq_getmsgint(s, 4); @@ -1245,6 +1272,61 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) return result; } +/* + * Cancel query + */ +static int +cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist) +{ + int i; + ListCell *nodelist_item; + char errbuf[256]; + int nCount; + bool bRet; + + nCount = 0; + + if (agent == NULL) + return nCount; + + /* Send cancel on Data nodes first */ + foreach(nodelist_item, datanodelist) + { + int node = lfirst_int(nodelist_item); + + if(node <= 0 || node > NumDataNodes) + continue; + + if (agent->dn_connections == NULL) + break; + + bRet = PQcancel((PGcancel *) agent->dn_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf)); + if (bRet != false) + { + nCount++; + } + } + + /* Send cancel to Coordinators too, e.g. if DDL was in progress */ + foreach(nodelist_item, coordlist) + { + int node = lfirst_int(nodelist_item); + + if(node <= 0 || node > NumDataNodes) + continue; + + if (agent->coord_connections == NULL) + break; + + bRet = PQcancel((PGcancel *) agent->coord_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf)); + if (bRet != false) + { + nCount++; + } + } + + return nCount; +} /* * Return connections back to the pool @@ -1262,6 +1344,9 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* Assert(Handle); + if (dn_ndisc == 0 && co_ndisc == 0) + return; + /* Insert the list of Datanodes in buffer */ n32 = htonl((uint32) dn_ndisc); buf[0] = n32; @@ -1290,6 +1375,52 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* pool_flush(&Handle->port); } +/* + * Cancel Query + */ +void +PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list) +{ + uint32 n32; + /* + * Buffer contains the list of both Coordinator and Datanodes, as well + * as the number of connections + */ + uint32 buf[2 + dn_count + co_count]; + int i; + + if (Handle == NULL || dn_list == NULL || co_list == NULL) + return; + + if (dn_count == 0 && co_count == 0) + return; + + /* Insert the list of Datanodes in buffer */ + n32 = htonl((uint32) dn_count); + buf[0] = n32; + + for (i = 0; i < dn_count;) + { + n32 = htonl((uint32) dn_list[i++]); + buf[i] = n32; + } + + /* Insert the list of Coordinators in buffer */ + n32 = htonl((uint32) co_count); + buf[dn_count + 1] = n32; + + /* Not necessary to send to pooler a request if there is no Coordinator */ + if (co_count != 0) + { + for (i = dn_count + 1; i < (dn_count + co_count + 1);) + { + n32 = htonl((uint32) co_list[i - (dn_count + 1)]); + buf[++i] = n32; + } + } + pool_putmessage(&Handle->port, 'h', (char *) buf, (2 + dn_count + co_count) * sizeof(uint32)); + pool_flush(&Handle->port); +} /* * Release connections for Datanodes and Coordinators @@ -1950,6 +2081,8 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type) break; } + slot->xc_cancelConn = PQgetCancel(slot->conn); + /* Insert at the end of the pool */ nodePool->slot[(nodePool->freeSize)++] = slot; @@ -1968,6 +2101,7 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type) static void destroy_slot(PGXCNodePoolSlot *slot) { + PQfreeCancel(slot->xc_cancelConn); PGXCNodeClose(slot->conn); pfree(slot); } diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index b2fab359b8..60e9cac802 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -71,6 +71,9 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif #undef _ @@ -221,6 +224,13 @@ errstart(int elevel, const char *filename, int lineno, */ if (elevel >= ERROR) { +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + cancel_query(); + clear_all_data(); + } +#endif /* * If we are inside a critical section, all errors become PANIC * errors. See miscadmin.h. @@ -1121,6 +1131,14 @@ elog_finish(int elevel, const char *fmt,...) CHECK_STACK_DEPTH(); +#ifdef PGXC + if (elevel >= ERROR && IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + cancel_query(); + clear_all_data(); + } +#endif + /* * Do errstart() to see if we actually want to report the message. */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7cdb0f6318..d864470d7c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -234,5 +234,4 @@ extern int xactGetCommittedChildren(TransactionId **ptr); extern void xact_redo(XLogRecPtr lsn, XLogRecord *record); extern void xact_desc(StringInfo buf, uint8 xl_info, char *rec); - #endif /* XACT_H */ diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index c2fe88422c..48d23ca399 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -141,6 +141,7 @@ extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Sna extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections); extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file); extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); +extern bool DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error); extern int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_connections); extern int ExecCountSlotsRemoteQuery(RemoteQuery *node); @@ -150,10 +151,8 @@ extern void ExecEndRemoteQuery(RemoteQueryState *step); extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner); -#ifdef PGXC -extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, - size_t len); -#endif +extern bool is_data_node_ready(PGXCNodeHandle * conn); +extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, size_t len); extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); extern void BufferConnection(PGXCNodeHandle *conn); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index 8f1eb54f1a..4b66a753e3 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -28,6 +28,7 @@ /* Connection to data node maintained by Pool Manager */ typedef struct PGconn NODE_CONNECTION; +typedef struct PGcancel NODE_CANCEL; /* Helper structure to access data node from Session */ typedef enum @@ -105,6 +106,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg); extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only); extern void release_handles(void); +extern void cancel_query(void); +extern void clear_all_data(void); + extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type, @@ -130,11 +134,14 @@ extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot); extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp); -extern int pgxc_node_receive(const int conn_count, +extern bool pgxc_node_receive(const int conn_count, PGXCNodeHandle ** connections, struct timeval * timeout); -extern int pgxc_node_read_data(PGXCNodeHandle * conn); +extern int pgxc_node_read_data(PGXCNodeHandle * conn, bool close_if_error); +extern int pgxc_node_is_data_enqueued(PGXCNodeHandle *conn); + extern int send_some(PGXCNodeHandle * handle, int len); extern int pgxc_node_flush(PGXCNodeHandle *handle); +extern void pgxc_node_flush_read(PGXCNodeHandle *handle); extern int pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error); extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error); diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index 4de9e4a339..79397687da 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -35,6 +35,7 @@ typedef struct { struct timeval released; NODE_CONNECTION *conn; + NODE_CANCEL *xc_cancelConn; } PGXCNodePoolSlot; /* Pool of connections to specified pgxc node */ @@ -149,4 +150,7 @@ extern int PoolManagerAbortTransactions(char *dbname, char *username, int **proc /* Return connections back to the pool, for both Coordinator and Datanode connections */ extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard); +/* Cancel a running query on data nodes as well as on other coordinators */ +extern void PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list); + #endif diff --git a/src/test/regress/expected/domain_1.out b/src/test/regress/expected/domain_1.out index 07808af958..02f1556462 100644 --- a/src/test/regress/expected/domain_1.out +++ b/src/test/regress/expected/domain_1.out @@ -48,8 +48,7 @@ ERROR: value too long for type character varying(5) INSERT INTO basictest values ('88', 'haha', 'short', '123.1212'); -- Truncate numeric -- Test copy COPY basictest (testvarchar) FROM stdin; -- fail -ERROR: value too long for type character varying(5) -CONTEXT: COPY basictest, line 1, column testvarchar: "notsoshorttext" +ERROR: Error while running COPY COPY basictest (testvarchar) FROM stdin; select * from basictest order by 1, 2, 3, 4; testint4 | testtext | testvarchar | testnumeric @@ -129,8 +128,7 @@ select testint4arr[1], testchar4arr[2:2] from domarrtest order by 1, 2; COPY domarrtest FROM stdin; COPY domarrtest FROM stdin; -- fail -ERROR: value too long for type character varying(4) -CONTEXT: COPY domarrtest, line 1, column testchar4arr: "{qwerty,w,e}" +ERROR: Error while running COPY select * from domarrtest order by 1, 2; testint4arr | testchar4arr ---------------+--------------------- @@ -174,8 +172,7 @@ INSERT INTO nulltest values ('a', 'b', 'c', NULL, 'd'); -- Good COPY nulltest FROM stdin; --fail ERROR: Error while running COPY COPY nulltest FROM stdin; --fail -ERROR: domain dcheck does not allow null values -CONTEXT: COPY nulltest, line 1, column col5: null input +ERROR: Error while running COPY -- Last row is bad COPY nulltest FROM stdin; ERROR: Error while running COPY diff --git a/src/test/regress/expected/xc_distkey.out b/src/test/regress/expected/xc_distkey.out index d050b27c33..819952a266 100644 --- a/src/test/regress/expected/xc_distkey.out +++ b/src/test/regress/expected/xc_distkey.out @@ -451,15 +451,15 @@ select * from ts_tab order by a; (2 rows) select * from ts_tab where a = 'May 10, 2011 00:01:02.03'; - a ------------------------- - 2011-05-10 00:01:02.03 + a +----------------------------- + Tue May 10 00:01:02.03 2011 (1 row) select * from ts_tab where a = 'August 14, 2001 23:59:59.99'; - a ------------------------- - 2001-08-14 23:59:59.99 + a +----------------------------- + Tue Aug 14 23:59:59.99 2001 (1 row) create table in_tab(a interval) distribute by modulo(a); @@ -517,15 +517,15 @@ select * from atim_tab order by a; (2 rows) select * from atim_tab where a = abstime('May 10, 2011 00:01:02.03'); - a ------------------------- - 2011-05-10 12:01:02+05 + a +------------------------------ + Tue May 10 00:01:02 2011 PDT (1 row) select * from atim_tab where a = abstime('Jun 23, 2001 23:59:59.99'); - a ------------------------- - 2001-06-24 11:59:59+05 + a +------------------------------ + Sat Jun 23 23:59:59 2001 PDT (1 row) create table rtim_tab(a reltime) distribute by modulo(a); @@ -563,13 +563,13 @@ select * from date_tab order by a; select * from date_tab where a = 'May 10, 2011'; a ------------ - 2011-05-10 + 05-10-2011 (1 row) select * from date_tab where a = 'August 23, 2001'; a ------------ - 2001-08-23 + 08-23-2001 (1 row) create table tstz_tab(a timestamp with time zone) distribute by modulo(a); @@ -583,15 +583,15 @@ select * from tstz_tab order by a; (2 rows) select * from tstz_tab where a = 'May 10, 2011 00:01:02.03 PST'; - a ---------------------------- - 2011-05-10 13:01:02.03+05 + a +--------------------------------- + Tue May 10 01:01:02.03 2011 PDT (1 row) select * from tstz_tab where a = 'Jun 23, 2001 23:59:59.99 PST'; - a ---------------------------- - 2001-06-24 12:59:59.99+05 + a +--------------------------------- + Sun Jun 24 00:59:59.99 2001 PDT (1 row) create table tstz_tab_h(a timestamp with time zone) distribute by hash(a); @@ -605,14 +605,14 @@ select * from tstz_tab_h order by a; (2 rows) select * from tstz_tab_h where a = 'May 10, 2011 00:01:02.03 PST'; - a ---------------------------- - 2011-05-10 13:01:02.03+05 + a +--------------------------------- + Tue May 10 01:01:02.03 2011 PDT (1 row) select * from tstz_tab_h where a = 'Jun 23, 2001 23:59:59.99 PST'; - a ---------------------------- - 2001-06-24 12:59:59.99+05 + a +--------------------------------- + Sun Jun 24 00:59:59.99 2001 PDT (1 row) diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 658f930e22..6b58aa78d0 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -42,7 +42,7 @@ test: comments test: geometry #After supporting other data types as distribution key, this test case crashes the server #Bug ID 3306801 tracks this crash -#test: horology +test: horology test: oidjoins test: type_sanity test: opr_sanity |