diff options
-rw-r--r-- | src/backend/access/common/heaptuple.c | 7 | ||||
-rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 159 | ||||
-rw-r--r-- | src/backend/pgxc/pool/pgxcnode.c | 218 | ||||
-rw-r--r-- | src/backend/pgxc/pool/poolcomm.c | 4 | ||||
-rw-r--r-- | src/backend/pgxc/pool/poolmgr.c | 154 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 18 | ||||
-rw-r--r-- | src/include/access/xact.h | 1 | ||||
-rw-r--r-- | src/include/pgxc/execRemote.h | 7 | ||||
-rw-r--r-- | src/include/pgxc/pgxcnode.h | 11 | ||||
-rw-r--r-- | src/include/pgxc/poolmgr.h | 4 | ||||
-rw-r--r-- | src/test/regress/expected/domain_1.out | 9 | ||||
-rw-r--r-- | src/test/regress/expected/xc_distkey.out | 52 | ||||
-rw-r--r-- | src/test/regress/serial_schedule | 2 |
13 files changed, 513 insertions, 133 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index c0103b811e..d34f00261a 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -1130,7 +1130,7 @@ slot_deform_tuple(TupleTableSlot *slot, int natts) static void slot_deform_datarow(TupleTableSlot *slot) { - int attnum = slot->tts_tupleDescriptor->natts; + int attnum; int i; int col_count; char *cur = slot->tts_dataRow; @@ -1138,6 +1138,11 @@ slot_deform_datarow(TupleTableSlot *slot) uint16 n16; uint32 n32; + if (slot->tts_tupleDescriptor == NULL || slot->tts_dataRow == NULL) + return; + + attnum = slot->tts_tupleDescriptor->natts; + /* fastpath: exit if values already extracted */ if (slot->tts_nvalid == attnum) return; diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index ea304533aa..2f77f5e014 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1195,8 +1195,8 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections, int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) { - char *msg; - int msg_len; + char *msg; + int msg_len; char msg_type; bool suspended = false; @@ -1327,6 +1327,64 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner) /* + * Has the data node sent Ready For Query + */ + +bool +is_data_node_ready(PGXCNodeHandle * conn) +{ + char *msg; + int msg_len; + char msg_type; + bool suspended = false; + + for (;;) + { + /* + * If we are in the process of shutting down, we + * may be rolling back, and the buffer may contain other messages. + * We want to avoid a procarray exception + * as well as an error stack overflow. + */ + if (proc_exit_inprogress) + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; + + /* don't read from from the connection if there is a fatal error */ + if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL) + return true; + + /* No data available, exit */ + if (!HAS_MESSAGE_BUFFERED(conn)) + return false; + + msg_type = get_message(conn, &msg_len, &msg); + switch (msg_type) + { + case 's': /* PortalSuspended */ + suspended = true; + break; + + case 'Z': /* ReadyForQuery */ + { + /* + * Return result depends on previous connection state. + * If it was PORTAL_SUSPENDED coordinator want to send down + * another EXECUTE to fetch more rows, otherwise it is done + * with the connection + */ + int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE; + conn->transaction_status = msg[0]; + conn->state = DN_CONNECTION_STATE_IDLE; + conn->combiner = NULL; + return true; + } + } + } + /* never happen, but keep compiler quiet */ + return false; +} + +/* * Send BEGIN command to the Datanodes or Coordinators and receive responses */ static int @@ -2453,7 +2511,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** if (bytes_needed > COPY_BUFFER_SIZE) { /* First look if data node has sent a error message */ - int read_status = pgxc_node_read_data(primary_handle); + int read_status = pgxc_node_read_data(primary_handle, true); if (read_status == EOF || read_status < 0) { add_error_message(primary_handle, "failed to read data from data node"); @@ -2514,7 +2572,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** int to_send = handle->outEnd; /* First look if data node has sent a error message */ - int read_status = pgxc_node_read_data(handle); + int read_status = pgxc_node_read_data(handle, true); if (read_status == EOF || read_status < 0) { add_error_message(handle, "failed to read data from data node"); @@ -2615,7 +2673,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* if (handle_response(handle,combiner) == RESPONSE_EOF) { /* read some extra-data */ - read_status = pgxc_node_read_data(handle); + read_status = pgxc_node_read_data(handle, true); if (read_status < 0) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -2679,30 +2737,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, if (primary_handle) { + error = true; if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; - memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); - primary_handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (pgxc_node_flush(primary_handle) < 0) - { - error = true; - } - } - else - { - error = true; - } + error = DataNodeCopyEnd(primary_handle, false); combiner = CreateResponseCombiner(conn_count + 1, combine_type); error = (pgxc_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error; @@ -2712,30 +2749,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, { PGXCNodeHandle *handle = connections[i]; + error = true; if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) - { - /* msgType + msgLen */ - if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) - { - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - - handle->outBuffer[handle->outEnd++] = 'c'; - memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); - handle->outEnd += 4; - - /* We need response right away, so send immediately */ - if (pgxc_node_flush(handle) < 0) - { - error = true; - } - } - else - { - error = true; - } + error = DataNodeCopyEnd(handle, false); } need_tran = !autocommit || primary_handle || conn_count > 1; @@ -2750,6 +2766,36 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, errmsg("Error while running COPY"))); } +/* + * End copy process on a connection + */ +bool +DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error) +{ + int nLen = htonl(4); + + if (handle == NULL) + return true; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) + return true; + + if (is_error) + handle->outBuffer[handle->outEnd++] = 'f'; + else + handle->outBuffer[handle->outEnd++] = 'c'; + + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (pgxc_node_flush(handle) < 0) + return true; + + return false; +} + RemoteQueryState * ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags) { @@ -3296,7 +3342,9 @@ do_query(RemoteQueryState *node) while (true) { int res; - pgxc_node_receive(1, &primaryconnection, NULL); + if (pgxc_node_receive(1, &primaryconnection, NULL)) + break; + res = handle_response(primaryconnection, node); if (res == RESPONSE_COMPLETE) break; @@ -4248,7 +4296,8 @@ ExecRemoteUtility(RemoteQuery *node) { int i = 0; - pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL); + if (pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL)) + break; /* * Handle input from the data nodes. * We do not expect data nodes returning tuples when running utility @@ -4296,7 +4345,9 @@ ExecRemoteUtility(RemoteQuery *node) { int i = 0; - pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL); + if (pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL)) + break; + while (i < co_conn_count) { int res = handle_response(pgxc_connections->coord_handles[i], remotestate); diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c index a0b8da48d8..a2e90cecd1 100644 --- a/src/backend/pgxc/pool/pgxcnode.c +++ b/src/backend/pgxc/pool/pgxcnode.c @@ -20,6 +20,7 @@ #include <sys/select.h> #include <sys/time.h> #include <sys/types.h> +#include <sys/ioctl.h> #include <stdlib.h> #include <string.h> #include <unistd.h> @@ -279,21 +280,35 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum) * Wait while at least one of specified connections has data available and read * the data into the buffer */ -int +bool pgxc_node_receive(const int conn_count, PGXCNodeHandle ** connections, struct timeval * timeout) { +#define ERROR_OCCURED true +#define NO_ERROR_OCCURED false int i, res_select, nfds = 0; - fd_set readfds; + fd_set readfds; + bool is_msg_buffered; FD_ZERO(&readfds); + + is_msg_buffered = false; + for (i = 0; i < conn_count; i++) + { + /* If connection has a buffered message */ + if (HAS_MESSAGE_BUFFERED(connections[i])) + { + is_msg_buffered = true; + break; + } + } + for (i = 0; i < conn_count; i++) { /* If connection finished sending do not wait input from it */ - if (connections[i]->state == DN_CONNECTION_STATE_IDLE - || HAS_MESSAGE_BUFFERED(connections[i])) + if (connections[i]->state == DN_CONNECTION_STATE_IDLE || HAS_MESSAGE_BUFFERED(connections[i])) continue; /* prepare select params */ @@ -313,7 +328,11 @@ pgxc_node_receive(const int conn_count, * Return if we do not have connections to receive input */ if (nfds == 0) - return 0; + { + if (is_msg_buffered) + return NO_ERROR_OCCURED; + return ERROR_OCCURED; + } retry: res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); @@ -328,14 +347,16 @@ retry: elog(WARNING, "select() bad file descriptor set"); } elog(WARNING, "select() error: %d", errno); - return errno; + if (errno) + return ERROR_OCCURED; + return NO_ERROR_OCCURED; } if (res_select == 0) { /* Handle timeout */ elog(WARNING, "timeout while waiting for response"); - return EOF; + return ERROR_OCCURED; } /* read data */ @@ -345,7 +366,7 @@ retry: if (FD_ISSET(conn->sock, &readfds)) { - int read_status = pgxc_node_read_data(conn); + int read_status = pgxc_node_read_data(conn, true); if (read_status == EOF || read_status < 0) { @@ -354,26 +375,46 @@ retry: add_error_message(conn, "unexpected EOF on datanode connection"); elog(WARNING, "unexpected EOF on datanode connection"); /* Should we read from the other connections before returning? */ - return EOF; + return ERROR_OCCURED; } } } - return 0; + return NO_ERROR_OCCURED; } +/* + * Is there any data enqueued in the TCP input buffer waiting + * to be read sent by the PGXC node connection + */ + +int +pgxc_node_is_data_enqueued(PGXCNodeHandle *conn) +{ + int ret; + int enqueued; + + if (conn->sock < 0) + return 0; + ret = ioctl(conn->sock, FIONREAD, &enqueued); + if (ret != 0) + return 0; + + return enqueued; +} /* * Read up incoming messages from the PGXC node connection */ int -pgxc_node_read_data(PGXCNodeHandle *conn) +pgxc_node_read_data(PGXCNodeHandle *conn, bool close_if_error) { int someread = 0; int nread; if (conn->sock < 0) { - add_error_message(conn, "bad socket"); + if (close_if_error) + add_error_message(conn, "bad socket"); return EOF; } @@ -412,7 +453,8 @@ pgxc_node_read_data(PGXCNodeHandle *conn) */ if (conn->inSize - conn->inEnd < 100) { - add_error_message(conn, "can not allocate buffer"); + if (close_if_error) + add_error_message(conn, "can not allocate buffer"); return -1; } } @@ -424,7 +466,8 @@ retry: if (nread < 0) { - elog(DEBUG1, "dnrd errno = %d", errno); + if (close_if_error) + elog(DEBUG1, "dnrd errno = %d", errno); if (errno == EINTR) goto retry; /* Some systems return EAGAIN/EWOULDBLOCK for no data */ @@ -444,19 +487,22 @@ retry: * OK, we are getting a zero read even though select() says ready. This * means the connection has been closed. Cope. */ - add_error_message(conn, - "data node closed the connection unexpectedly\n" - "\tThis probably means the data node terminated abnormally\n" - "\tbefore or while processing the request.\n"); - conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to - * backend */ - closesocket(conn->sock); - conn->sock = NO_SOCKET; - + if (close_if_error) + { + add_error_message(conn, + "data node closed the connection unexpectedly\n" + "\tThis probably means the data node terminated abnormally\n" + "\tbefore or while processing the request.\n"); + conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to + * backend */ + closesocket(conn->sock); + conn->sock = NO_SOCKET; + } return -1; } #endif - add_error_message(conn, "could not receive data from server"); + if (close_if_error) + add_error_message(conn, "could not receive data from server"); return -1; } @@ -488,7 +534,8 @@ retry: if (nread == 0) { - elog(DEBUG1, "nread returned 0"); + if (close_if_error) + elog(DEBUG1, "nread returned 0"); return EOF; } @@ -661,6 +708,102 @@ release_handles(void) coord_count = 0; } +/* + * cancel a running query due to error while processing rows + */ +void +cancel_query(void) +{ + int i; + int dn_cancel[NumDataNodes]; + int co_cancel[NumCoords]; + int dn_count = 0; + int co_count = 0; + + if (datanode_count == 0 && coord_count == 0) + return; + + /* Collect Data Nodes handles */ + for (i = 0; i < NumDataNodes; i++) + { + PGXCNodeHandle *handle = &dn_handles[i]; + + if (handle->sock != NO_SOCKET) + { + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + DataNodeCopyEnd(handle, true); + } + else + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + dn_cancel[dn_count++] = handle->nodenum; + } + } + } + } + + /* Collect Coordinator handles */ + for (i = 0; i < NumCoords; i++) + { + PGXCNodeHandle *handle = &co_handles[i]; + + if (handle->sock != NO_SOCKET) + { + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + DataNodeCopyEnd(handle, true); + } + else + { + if (handle->state != DN_CONNECTION_STATE_IDLE) + { + co_cancel[dn_count++] = handle->nodenum; + } + } + } + } + + PoolManagerCancelQuery(dn_count, dn_cancel, co_count, co_cancel); +} + +/* + * This method won't return until all network buffers are empty + * To ensure all data in all network buffers is read and wasted + */ +void +clear_all_data(void) +{ + int i; + + if (datanode_count == 0 && coord_count == 0) + return; + + /* Collect Data Nodes handles */ + for (i = 0; i < NumDataNodes; i++) + { + PGXCNodeHandle *handle = &dn_handles[i]; + + if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE) + { + pgxc_node_flush_read(handle); + handle->state = DN_CONNECTION_STATE_IDLE; + } + } + + /* Collect Coordinator handles */ + for (i = 0; i < NumCoords; i++) + { + PGXCNodeHandle *handle = &co_handles[i]; + + if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE) + { + pgxc_node_flush_read(handle); + handle->state = DN_CONNECTION_STATE_IDLE; + } + } +} /* * Ensure specified amount of data can fit to the incoming buffer and @@ -1224,6 +1367,31 @@ pgxc_node_flush(PGXCNodeHandle *handle) } /* + * This method won't return until network buffer is empty or error occurs + * To ensure all data in network buffers is read and wasted + */ +void +pgxc_node_flush_read(PGXCNodeHandle *handle) +{ + bool is_ready; + int read_result; + + if (handle == NULL) + return; + + while(true) + { + is_ready = is_data_node_ready(handle); + if (is_ready == true) + break; + + read_result = pgxc_node_read_data(handle, false); + if (read_result < 0) + break; + } +} + +/* * Send specified statement down to the PGXC node */ int diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c index 79a377660a..22dc813e45 100644 --- a/src/backend/pgxc/pool/poolcomm.c +++ b/src/backend/pgxc/pool/poolcomm.c @@ -435,9 +435,7 @@ pool_flush(PoolPort *port) * If shutting down already, do not call. */ if (!proc_exit_inprogress) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not send data to client: %m"))); + return 0; } /* diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c index 1b2c4bfcf2..463bd5a170 100644 --- a/src/backend/pgxc/pool/poolmgr.c +++ b/src/backend/pgxc/pool/poolmgr.c @@ -104,6 +104,7 @@ static DatabasePool *find_database_pool_to_clean(const char *database, List *co_list); static DatabasePool *remove_database_pool(const char *database, const char *user_name); static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist); +static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist); static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type); static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard); static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list); @@ -878,17 +879,17 @@ agent_handle_input(PoolAgent * agent, StringInfo s) */ for (;;) { - const char *database = NULL; - const char *user_name = NULL; - const char *set_command; + const char *database = NULL; + const char *user_name = NULL; + const char *set_command; bool is_local; - int datanodecount; - int coordcount; - List *datanodelist = NIL; - List *coordlist = NIL; - int *fds; - int *pids; - int i, len, res; + int datanodecount; + int coordcount; + List *datanodelist = NIL; + List *coordlist = NIL; + int *fds; + int *pids; + int i, len, res; /* * During a pool cleaning, Abort, Connect and Get Connections messages @@ -1001,6 +1002,32 @@ agent_handle_input(PoolAgent * agent, StringInfo s) if (fds) pfree(fds); break; + + case 'h': /* Cancel SQL Command in progress on specified connections */ + /* + * Length of message is caused by: + * - Message header = 4bytes + * - List of datanodes = NumDataNodes * 4bytes (max) + * - List of coordinators = NumCoords * 4bytes (max) + * - Number of Datanodes sent = 4bytes + * - Number of Coordinators sent = 4bytes + */ + pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12); + datanodecount = pq_getmsgint(s, 4); + for (i = 0; i < datanodecount; i++) + datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4)); + coordcount = pq_getmsgint(s, 4); + /* It is possible that no Coordinators are involved in the transaction */ + for (i = 0; i < coordcount; i++) + coordlist = lappend_int(coordlist, pq_getmsgint(s, 4)); + pq_getmsgend(s); + + cancel_query_on_connections(agent, datanodelist, coordlist); + list_free(datanodelist); + list_free(coordlist); + + break; + case 'r': /* RELEASE CONNECTIONS */ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12); datanodecount = pq_getmsgint(s, 4); @@ -1245,6 +1272,61 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist) return result; } +/* + * Cancel query + */ +static int +cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist) +{ + int i; + ListCell *nodelist_item; + char errbuf[256]; + int nCount; + bool bRet; + + nCount = 0; + + if (agent == NULL) + return nCount; + + /* Send cancel on Data nodes first */ + foreach(nodelist_item, datanodelist) + { + int node = lfirst_int(nodelist_item); + + if(node <= 0 || node > NumDataNodes) + continue; + + if (agent->dn_connections == NULL) + break; + + bRet = PQcancel((PGcancel *) agent->dn_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf)); + if (bRet != false) + { + nCount++; + } + } + + /* Send cancel to Coordinators too, e.g. if DDL was in progress */ + foreach(nodelist_item, coordlist) + { + int node = lfirst_int(nodelist_item); + + if(node <= 0 || node > NumDataNodes) + continue; + + if (agent->coord_connections == NULL) + break; + + bRet = PQcancel((PGcancel *) agent->coord_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf)); + if (bRet != false) + { + nCount++; + } + } + + return nCount; +} /* * Return connections back to the pool @@ -1262,6 +1344,9 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* Assert(Handle); + if (dn_ndisc == 0 && co_ndisc == 0) + return; + /* Insert the list of Datanodes in buffer */ n32 = htonl((uint32) dn_ndisc); buf[0] = n32; @@ -1290,6 +1375,52 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* pool_flush(&Handle->port); } +/* + * Cancel Query + */ +void +PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list) +{ + uint32 n32; + /* + * Buffer contains the list of both Coordinator and Datanodes, as well + * as the number of connections + */ + uint32 buf[2 + dn_count + co_count]; + int i; + + if (Handle == NULL || dn_list == NULL || co_list == NULL) + return; + + if (dn_count == 0 && co_count == 0) + return; + + /* Insert the list of Datanodes in buffer */ + n32 = htonl((uint32) dn_count); + buf[0] = n32; + + for (i = 0; i < dn_count;) + { + n32 = htonl((uint32) dn_list[i++]); + buf[i] = n32; + } + + /* Insert the list of Coordinators in buffer */ + n32 = htonl((uint32) co_count); + buf[dn_count + 1] = n32; + + /* Not necessary to send to pooler a request if there is no Coordinator */ + if (co_count != 0) + { + for (i = dn_count + 1; i < (dn_count + co_count + 1);) + { + n32 = htonl((uint32) co_list[i - (dn_count + 1)]); + buf[++i] = n32; + } + } + pool_putmessage(&Handle->port, 'h', (char *) buf, (2 + dn_count + co_count) * sizeof(uint32)); + pool_flush(&Handle->port); +} /* * Release connections for Datanodes and Coordinators @@ -1950,6 +2081,8 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type) break; } + slot->xc_cancelConn = PQgetCancel(slot->conn); + /* Insert at the end of the pool */ nodePool->slot[(nodePool->freeSize)++] = slot; @@ -1968,6 +2101,7 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type) static void destroy_slot(PGXCNodePoolSlot *slot) { + PQfreeCancel(slot->xc_cancelConn); PGXCNodeClose(slot->conn); pfree(slot); } diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index b2fab359b8..60e9cac802 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -71,6 +71,9 @@ #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif #undef _ @@ -221,6 +224,13 @@ errstart(int elevel, const char *filename, int lineno, */ if (elevel >= ERROR) { +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + cancel_query(); + clear_all_data(); + } +#endif /* * If we are inside a critical section, all errors become PANIC * errors. See miscadmin.h. @@ -1121,6 +1131,14 @@ elog_finish(int elevel, const char *fmt,...) CHECK_STACK_DEPTH(); +#ifdef PGXC + if (elevel >= ERROR && IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + cancel_query(); + clear_all_data(); + } +#endif + /* * Do errstart() to see if we actually want to report the message. */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7cdb0f6318..d864470d7c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -234,5 +234,4 @@ extern int xactGetCommittedChildren(TransactionId **ptr); extern void xact_redo(XLogRecPtr lsn, XLogRecord *record); extern void xact_desc(StringInfo buf, uint8 xl_info, char *rec); - #endif /* XACT_H */ diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index c2fe88422c..48d23ca399 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -141,6 +141,7 @@ extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Sna extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections); extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file); extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type); +extern bool DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error); extern int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_connections); extern int ExecCountSlotsRemoteQuery(RemoteQuery *node); @@ -150,10 +151,8 @@ extern void ExecEndRemoteQuery(RemoteQueryState *step); extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner); -#ifdef PGXC -extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, - size_t len); -#endif +extern bool is_data_node_ready(PGXCNodeHandle * conn); +extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, size_t len); extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); extern void BufferConnection(PGXCNodeHandle *conn); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index 8f1eb54f1a..4b66a753e3 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -28,6 +28,7 @@ /* Connection to data node maintained by Pool Manager */ typedef struct PGconn NODE_CONNECTION; +typedef struct PGcancel NODE_CANCEL; /* Helper structure to access data node from Session */ typedef enum @@ -105,6 +106,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg); extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only); extern void release_handles(void); +extern void cancel_query(void); +extern void clear_all_data(void); + extern int get_transaction_nodes(PGXCNodeHandle ** connections, char client_conn_type, @@ -130,11 +134,14 @@ extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot); extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp); -extern int pgxc_node_receive(const int conn_count, +extern bool pgxc_node_receive(const int conn_count, PGXCNodeHandle ** connections, struct timeval * timeout); -extern int pgxc_node_read_data(PGXCNodeHandle * conn); +extern int pgxc_node_read_data(PGXCNodeHandle * conn, bool close_if_error); +extern int pgxc_node_is_data_enqueued(PGXCNodeHandle *conn); + extern int send_some(PGXCNodeHandle * handle, int len); extern int pgxc_node_flush(PGXCNodeHandle *handle); +extern void pgxc_node_flush_read(PGXCNodeHandle *handle); extern int pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error); extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error); diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h index 4de9e4a339..79397687da 100644 --- a/src/include/pgxc/poolmgr.h +++ b/src/include/pgxc/poolmgr.h @@ -35,6 +35,7 @@ typedef struct { struct timeval released; NODE_CONNECTION *conn; + NODE_CANCEL *xc_cancelConn; } PGXCNodePoolSlot; /* Pool of connections to specified pgxc node */ @@ -149,4 +150,7 @@ extern int PoolManagerAbortTransactions(char *dbname, char *username, int **proc /* Return connections back to the pool, for both Coordinator and Datanode connections */ extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard); +/* Cancel a running query on data nodes as well as on other coordinators */ +extern void PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list); + #endif diff --git a/src/test/regress/expected/domain_1.out b/src/test/regress/expected/domain_1.out index 07808af958..02f1556462 100644 --- a/src/test/regress/expected/domain_1.out +++ b/src/test/regress/expected/domain_1.out @@ -48,8 +48,7 @@ ERROR: value too long for type character varying(5) INSERT INTO basictest values ('88', 'haha', 'short', '123.1212'); -- Truncate numeric -- Test copy COPY basictest (testvarchar) FROM stdin; -- fail -ERROR: value too long for type character varying(5) -CONTEXT: COPY basictest, line 1, column testvarchar: "notsoshorttext" +ERROR: Error while running COPY COPY basictest (testvarchar) FROM stdin; select * from basictest order by 1, 2, 3, 4; testint4 | testtext | testvarchar | testnumeric @@ -129,8 +128,7 @@ select testint4arr[1], testchar4arr[2:2] from domarrtest order by 1, 2; COPY domarrtest FROM stdin; COPY domarrtest FROM stdin; -- fail -ERROR: value too long for type character varying(4) -CONTEXT: COPY domarrtest, line 1, column testchar4arr: "{qwerty,w,e}" +ERROR: Error while running COPY select * from domarrtest order by 1, 2; testint4arr | testchar4arr ---------------+--------------------- @@ -174,8 +172,7 @@ INSERT INTO nulltest values ('a', 'b', 'c', NULL, 'd'); -- Good COPY nulltest FROM stdin; --fail ERROR: Error while running COPY COPY nulltest FROM stdin; --fail -ERROR: domain dcheck does not allow null values -CONTEXT: COPY nulltest, line 1, column col5: null input +ERROR: Error while running COPY -- Last row is bad COPY nulltest FROM stdin; ERROR: Error while running COPY diff --git a/src/test/regress/expected/xc_distkey.out b/src/test/regress/expected/xc_distkey.out index d050b27c33..819952a266 100644 --- a/src/test/regress/expected/xc_distkey.out +++ b/src/test/regress/expected/xc_distkey.out @@ -451,15 +451,15 @@ select * from ts_tab order by a; (2 rows) select * from ts_tab where a = 'May 10, 2011 00:01:02.03'; - a ------------------------- - 2011-05-10 00:01:02.03 + a +----------------------------- + Tue May 10 00:01:02.03 2011 (1 row) select * from ts_tab where a = 'August 14, 2001 23:59:59.99'; - a ------------------------- - 2001-08-14 23:59:59.99 + a +----------------------------- + Tue Aug 14 23:59:59.99 2001 (1 row) create table in_tab(a interval) distribute by modulo(a); @@ -517,15 +517,15 @@ select * from atim_tab order by a; (2 rows) select * from atim_tab where a = abstime('May 10, 2011 00:01:02.03'); - a ------------------------- - 2011-05-10 12:01:02+05 + a +------------------------------ + Tue May 10 00:01:02 2011 PDT (1 row) select * from atim_tab where a = abstime('Jun 23, 2001 23:59:59.99'); - a ------------------------- - 2001-06-24 11:59:59+05 + a +------------------------------ + Sat Jun 23 23:59:59 2001 PDT (1 row) create table rtim_tab(a reltime) distribute by modulo(a); @@ -563,13 +563,13 @@ select * from date_tab order by a; select * from date_tab where a = 'May 10, 2011'; a ------------ - 2011-05-10 + 05-10-2011 (1 row) select * from date_tab where a = 'August 23, 2001'; a ------------ - 2001-08-23 + 08-23-2001 (1 row) create table tstz_tab(a timestamp with time zone) distribute by modulo(a); @@ -583,15 +583,15 @@ select * from tstz_tab order by a; (2 rows) select * from tstz_tab where a = 'May 10, 2011 00:01:02.03 PST'; - a ---------------------------- - 2011-05-10 13:01:02.03+05 + a +--------------------------------- + Tue May 10 01:01:02.03 2011 PDT (1 row) select * from tstz_tab where a = 'Jun 23, 2001 23:59:59.99 PST'; - a ---------------------------- - 2001-06-24 12:59:59.99+05 + a +--------------------------------- + Sun Jun 24 00:59:59.99 2001 PDT (1 row) create table tstz_tab_h(a timestamp with time zone) distribute by hash(a); @@ -605,14 +605,14 @@ select * from tstz_tab_h order by a; (2 rows) select * from tstz_tab_h where a = 'May 10, 2011 00:01:02.03 PST'; - a ---------------------------- - 2011-05-10 13:01:02.03+05 + a +--------------------------------- + Tue May 10 01:01:02.03 2011 PDT (1 row) select * from tstz_tab_h where a = 'Jun 23, 2001 23:59:59.99 PST'; - a ---------------------------- - 2001-06-24 12:59:59.99+05 + a +--------------------------------- + Sun Jun 24 00:59:59.99 2001 PDT (1 row) diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 658f930e22..6b58aa78d0 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -42,7 +42,7 @@ test: comments test: geometry #After supporting other data types as distribution key, this test case crashes the server #Bug ID 3306801 tracks this crash -#test: horology +test: horology test: oidjoins test: type_sanity test: opr_sanity |