summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/common/heaptuple.c7
-rw-r--r--src/backend/pgxc/pool/execRemote.c159
-rw-r--r--src/backend/pgxc/pool/pgxcnode.c218
-rw-r--r--src/backend/pgxc/pool/poolcomm.c4
-rw-r--r--src/backend/pgxc/pool/poolmgr.c154
-rw-r--r--src/backend/utils/error/elog.c18
-rw-r--r--src/include/access/xact.h1
-rw-r--r--src/include/pgxc/execRemote.h7
-rw-r--r--src/include/pgxc/pgxcnode.h11
-rw-r--r--src/include/pgxc/poolmgr.h4
-rw-r--r--src/test/regress/expected/domain_1.out9
-rw-r--r--src/test/regress/expected/xc_distkey.out52
-rw-r--r--src/test/regress/serial_schedule2
13 files changed, 513 insertions, 133 deletions
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index c0103b811e..d34f00261a 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -1130,7 +1130,7 @@ slot_deform_tuple(TupleTableSlot *slot, int natts)
static void
slot_deform_datarow(TupleTableSlot *slot)
{
- int attnum = slot->tts_tupleDescriptor->natts;
+ int attnum;
int i;
int col_count;
char *cur = slot->tts_dataRow;
@@ -1138,6 +1138,11 @@ slot_deform_datarow(TupleTableSlot *slot)
uint16 n16;
uint32 n32;
+ if (slot->tts_tupleDescriptor == NULL || slot->tts_dataRow == NULL)
+ return;
+
+ attnum = slot->tts_tupleDescriptor->natts;
+
/* fastpath: exit if values already extracted */
if (slot->tts_nvalid == attnum)
return;
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index ea304533aa..2f77f5e014 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -1195,8 +1195,8 @@ pgxc_node_receive_responses(const int conn_count, PGXCNodeHandle ** connections,
int
handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
{
- char *msg;
- int msg_len;
+ char *msg;
+ int msg_len;
char msg_type;
bool suspended = false;
@@ -1327,6 +1327,64 @@ handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner)
/*
+ * Has the data node sent Ready For Query
+ */
+
+bool
+is_data_node_ready(PGXCNodeHandle * conn)
+{
+ char *msg;
+ int msg_len;
+ char msg_type;
+ bool suspended = false;
+
+ for (;;)
+ {
+ /*
+ * If we are in the process of shutting down, we
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
+ if (proc_exit_inprogress)
+ conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+
+ /* don't read from from the connection if there is a fatal error */
+ if (conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
+ return true;
+
+ /* No data available, exit */
+ if (!HAS_MESSAGE_BUFFERED(conn))
+ return false;
+
+ msg_type = get_message(conn, &msg_len, &msg);
+ switch (msg_type)
+ {
+ case 's': /* PortalSuspended */
+ suspended = true;
+ break;
+
+ case 'Z': /* ReadyForQuery */
+ {
+ /*
+ * Return result depends on previous connection state.
+ * If it was PORTAL_SUSPENDED coordinator want to send down
+ * another EXECUTE to fetch more rows, otherwise it is done
+ * with the connection
+ */
+ int result = suspended ? RESPONSE_SUSPENDED : RESPONSE_COMPLETE;
+ conn->transaction_status = msg[0];
+ conn->state = DN_CONNECTION_STATE_IDLE;
+ conn->combiner = NULL;
+ return true;
+ }
+ }
+ }
+ /* never happen, but keep compiler quiet */
+ return false;
+}
+
+/*
* Send BEGIN command to the Datanodes or Coordinators and receive responses
*/
static int
@@ -2453,7 +2511,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle**
if (bytes_needed > COPY_BUFFER_SIZE)
{
/* First look if data node has sent a error message */
- int read_status = pgxc_node_read_data(primary_handle);
+ int read_status = pgxc_node_read_data(primary_handle, true);
if (read_status == EOF || read_status < 0)
{
add_error_message(primary_handle, "failed to read data from data node");
@@ -2514,7 +2572,7 @@ DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle**
int to_send = handle->outEnd;
/* First look if data node has sent a error message */
- int read_status = pgxc_node_read_data(handle);
+ int read_status = pgxc_node_read_data(handle, true);
if (read_status == EOF || read_status < 0)
{
add_error_message(handle, "failed to read data from data node");
@@ -2615,7 +2673,7 @@ DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE*
if (handle_response(handle,combiner) == RESPONSE_EOF)
{
/* read some extra-data */
- read_status = pgxc_node_read_data(handle);
+ read_status = pgxc_node_read_data(handle, true);
if (read_status < 0)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -2679,30 +2737,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node,
if (primary_handle)
{
+ error = true;
if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT)
- {
- /* msgType + msgLen */
- if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- primary_handle->outBuffer[primary_handle->outEnd++] = 'c';
- memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
- primary_handle->outEnd += 4;
-
- /* We need response right away, so send immediately */
- if (pgxc_node_flush(primary_handle) < 0)
- {
- error = true;
- }
- }
- else
- {
- error = true;
- }
+ error = DataNodeCopyEnd(primary_handle, false);
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
error = (pgxc_node_receive_responses(1, &primary_handle, timeout, combiner) != 0) || error;
@@ -2712,30 +2749,9 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node,
{
PGXCNodeHandle *handle = connections[i];
+ error = true;
if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
- {
- /* msgType + msgLen */
- if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0)
- {
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
-
- handle->outBuffer[handle->outEnd++] = 'c';
- memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
- handle->outEnd += 4;
-
- /* We need response right away, so send immediately */
- if (pgxc_node_flush(handle) < 0)
- {
- error = true;
- }
- }
- else
- {
- error = true;
- }
+ error = DataNodeCopyEnd(handle, false);
}
need_tran = !autocommit || primary_handle || conn_count > 1;
@@ -2750,6 +2766,36 @@ DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node,
errmsg("Error while running COPY")));
}
+/*
+ * End copy process on a connection
+ */
+bool
+DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error)
+{
+ int nLen = htonl(4);
+
+ if (handle == NULL)
+ return true;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0)
+ return true;
+
+ if (is_error)
+ handle->outBuffer[handle->outEnd++] = 'f';
+ else
+ handle->outBuffer[handle->outEnd++] = 'c';
+
+ memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
+ handle->outEnd += 4;
+
+ /* We need response right away, so send immediately */
+ if (pgxc_node_flush(handle) < 0)
+ return true;
+
+ return false;
+}
+
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
@@ -3296,7 +3342,9 @@ do_query(RemoteQueryState *node)
while (true)
{
int res;
- pgxc_node_receive(1, &primaryconnection, NULL);
+ if (pgxc_node_receive(1, &primaryconnection, NULL))
+ break;
+
res = handle_response(primaryconnection, node);
if (res == RESPONSE_COMPLETE)
break;
@@ -4248,7 +4296,8 @@ ExecRemoteUtility(RemoteQuery *node)
{
int i = 0;
- pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL);
+ if (pgxc_node_receive(dn_conn_count, pgxc_connections->datanode_handles, NULL))
+ break;
/*
* Handle input from the data nodes.
* We do not expect data nodes returning tuples when running utility
@@ -4296,7 +4345,9 @@ ExecRemoteUtility(RemoteQuery *node)
{
int i = 0;
- pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL);
+ if (pgxc_node_receive(co_conn_count, pgxc_connections->coord_handles, NULL))
+ break;
+
while (i < co_conn_count)
{
int res = handle_response(pgxc_connections->coord_handles[i], remotestate);
diff --git a/src/backend/pgxc/pool/pgxcnode.c b/src/backend/pgxc/pool/pgxcnode.c
index a0b8da48d8..a2e90cecd1 100644
--- a/src/backend/pgxc/pool/pgxcnode.c
+++ b/src/backend/pgxc/pool/pgxcnode.c
@@ -20,6 +20,7 @@
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
+#include <sys/ioctl.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -279,21 +280,35 @@ pgxc_node_init(PGXCNodeHandle *handle, int sock, int nodenum)
* Wait while at least one of specified connections has data available and read
* the data into the buffer
*/
-int
+bool
pgxc_node_receive(const int conn_count,
PGXCNodeHandle ** connections, struct timeval * timeout)
{
+#define ERROR_OCCURED true
+#define NO_ERROR_OCCURED false
int i,
res_select,
nfds = 0;
- fd_set readfds;
+ fd_set readfds;
+ bool is_msg_buffered;
FD_ZERO(&readfds);
+
+ is_msg_buffered = false;
+ for (i = 0; i < conn_count; i++)
+ {
+ /* If connection has a buffered message */
+ if (HAS_MESSAGE_BUFFERED(connections[i]))
+ {
+ is_msg_buffered = true;
+ break;
+ }
+ }
+
for (i = 0; i < conn_count; i++)
{
/* If connection finished sending do not wait input from it */
- if (connections[i]->state == DN_CONNECTION_STATE_IDLE
- || HAS_MESSAGE_BUFFERED(connections[i]))
+ if (connections[i]->state == DN_CONNECTION_STATE_IDLE || HAS_MESSAGE_BUFFERED(connections[i]))
continue;
/* prepare select params */
@@ -313,7 +328,11 @@ pgxc_node_receive(const int conn_count,
* Return if we do not have connections to receive input
*/
if (nfds == 0)
- return 0;
+ {
+ if (is_msg_buffered)
+ return NO_ERROR_OCCURED;
+ return ERROR_OCCURED;
+ }
retry:
res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
@@ -328,14 +347,16 @@ retry:
elog(WARNING, "select() bad file descriptor set");
}
elog(WARNING, "select() error: %d", errno);
- return errno;
+ if (errno)
+ return ERROR_OCCURED;
+ return NO_ERROR_OCCURED;
}
if (res_select == 0)
{
/* Handle timeout */
elog(WARNING, "timeout while waiting for response");
- return EOF;
+ return ERROR_OCCURED;
}
/* read data */
@@ -345,7 +366,7 @@ retry:
if (FD_ISSET(conn->sock, &readfds))
{
- int read_status = pgxc_node_read_data(conn);
+ int read_status = pgxc_node_read_data(conn, true);
if (read_status == EOF || read_status < 0)
{
@@ -354,26 +375,46 @@ retry:
add_error_message(conn, "unexpected EOF on datanode connection");
elog(WARNING, "unexpected EOF on datanode connection");
/* Should we read from the other connections before returning? */
- return EOF;
+ return ERROR_OCCURED;
}
}
}
- return 0;
+ return NO_ERROR_OCCURED;
}
+/*
+ * Is there any data enqueued in the TCP input buffer waiting
+ * to be read sent by the PGXC node connection
+ */
+
+int
+pgxc_node_is_data_enqueued(PGXCNodeHandle *conn)
+{
+ int ret;
+ int enqueued;
+
+ if (conn->sock < 0)
+ return 0;
+ ret = ioctl(conn->sock, FIONREAD, &enqueued);
+ if (ret != 0)
+ return 0;
+
+ return enqueued;
+}
/*
* Read up incoming messages from the PGXC node connection
*/
int
-pgxc_node_read_data(PGXCNodeHandle *conn)
+pgxc_node_read_data(PGXCNodeHandle *conn, bool close_if_error)
{
int someread = 0;
int nread;
if (conn->sock < 0)
{
- add_error_message(conn, "bad socket");
+ if (close_if_error)
+ add_error_message(conn, "bad socket");
return EOF;
}
@@ -412,7 +453,8 @@ pgxc_node_read_data(PGXCNodeHandle *conn)
*/
if (conn->inSize - conn->inEnd < 100)
{
- add_error_message(conn, "can not allocate buffer");
+ if (close_if_error)
+ add_error_message(conn, "can not allocate buffer");
return -1;
}
}
@@ -424,7 +466,8 @@ retry:
if (nread < 0)
{
- elog(DEBUG1, "dnrd errno = %d", errno);
+ if (close_if_error)
+ elog(DEBUG1, "dnrd errno = %d", errno);
if (errno == EINTR)
goto retry;
/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -444,19 +487,22 @@ retry:
* OK, we are getting a zero read even though select() says ready. This
* means the connection has been closed. Cope.
*/
- add_error_message(conn,
- "data node closed the connection unexpectedly\n"
- "\tThis probably means the data node terminated abnormally\n"
- "\tbefore or while processing the request.\n");
- conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to
- * backend */
- closesocket(conn->sock);
- conn->sock = NO_SOCKET;
-
+ if (close_if_error)
+ {
+ add_error_message(conn,
+ "data node closed the connection unexpectedly\n"
+ "\tThis probably means the data node terminated abnormally\n"
+ "\tbefore or while processing the request.\n");
+ conn->state = DN_CONNECTION_STATE_ERROR_FATAL; /* No more connection to
+ * backend */
+ closesocket(conn->sock);
+ conn->sock = NO_SOCKET;
+ }
return -1;
}
#endif
- add_error_message(conn, "could not receive data from server");
+ if (close_if_error)
+ add_error_message(conn, "could not receive data from server");
return -1;
}
@@ -488,7 +534,8 @@ retry:
if (nread == 0)
{
- elog(DEBUG1, "nread returned 0");
+ if (close_if_error)
+ elog(DEBUG1, "nread returned 0");
return EOF;
}
@@ -661,6 +708,102 @@ release_handles(void)
coord_count = 0;
}
+/*
+ * cancel a running query due to error while processing rows
+ */
+void
+cancel_query(void)
+{
+ int i;
+ int dn_cancel[NumDataNodes];
+ int co_cancel[NumCoords];
+ int dn_count = 0;
+ int co_count = 0;
+
+ if (datanode_count == 0 && coord_count == 0)
+ return;
+
+ /* Collect Data Nodes handles */
+ for (i = 0; i < NumDataNodes; i++)
+ {
+ PGXCNodeHandle *handle = &dn_handles[i];
+
+ if (handle->sock != NO_SOCKET)
+ {
+ if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ DataNodeCopyEnd(handle, true);
+ }
+ else
+ {
+ if (handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ dn_cancel[dn_count++] = handle->nodenum;
+ }
+ }
+ }
+ }
+
+ /* Collect Coordinator handles */
+ for (i = 0; i < NumCoords; i++)
+ {
+ PGXCNodeHandle *handle = &co_handles[i];
+
+ if (handle->sock != NO_SOCKET)
+ {
+ if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ DataNodeCopyEnd(handle, true);
+ }
+ else
+ {
+ if (handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ co_cancel[dn_count++] = handle->nodenum;
+ }
+ }
+ }
+ }
+
+ PoolManagerCancelQuery(dn_count, dn_cancel, co_count, co_cancel);
+}
+
+/*
+ * This method won't return until all network buffers are empty
+ * To ensure all data in all network buffers is read and wasted
+ */
+void
+clear_all_data(void)
+{
+ int i;
+
+ if (datanode_count == 0 && coord_count == 0)
+ return;
+
+ /* Collect Data Nodes handles */
+ for (i = 0; i < NumDataNodes; i++)
+ {
+ PGXCNodeHandle *handle = &dn_handles[i];
+
+ if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ pgxc_node_flush_read(handle);
+ handle->state = DN_CONNECTION_STATE_IDLE;
+ }
+ }
+
+ /* Collect Coordinator handles */
+ for (i = 0; i < NumCoords; i++)
+ {
+ PGXCNodeHandle *handle = &co_handles[i];
+
+ if (handle->sock != NO_SOCKET && handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ pgxc_node_flush_read(handle);
+ handle->state = DN_CONNECTION_STATE_IDLE;
+ }
+ }
+}
/*
* Ensure specified amount of data can fit to the incoming buffer and
@@ -1224,6 +1367,31 @@ pgxc_node_flush(PGXCNodeHandle *handle)
}
/*
+ * This method won't return until network buffer is empty or error occurs
+ * To ensure all data in network buffers is read and wasted
+ */
+void
+pgxc_node_flush_read(PGXCNodeHandle *handle)
+{
+ bool is_ready;
+ int read_result;
+
+ if (handle == NULL)
+ return;
+
+ while(true)
+ {
+ is_ready = is_data_node_ready(handle);
+ if (is_ready == true)
+ break;
+
+ read_result = pgxc_node_read_data(handle, false);
+ if (read_result < 0)
+ break;
+ }
+}
+
+/*
* Send specified statement down to the PGXC node
*/
int
diff --git a/src/backend/pgxc/pool/poolcomm.c b/src/backend/pgxc/pool/poolcomm.c
index 79a377660a..22dc813e45 100644
--- a/src/backend/pgxc/pool/poolcomm.c
+++ b/src/backend/pgxc/pool/poolcomm.c
@@ -435,9 +435,7 @@ pool_flush(PoolPort *port)
* If shutting down already, do not call.
*/
if (!proc_exit_inprogress)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("could not send data to client: %m")));
+ return 0;
}
/*
diff --git a/src/backend/pgxc/pool/poolmgr.c b/src/backend/pgxc/pool/poolmgr.c
index 1b2c4bfcf2..463bd5a170 100644
--- a/src/backend/pgxc/pool/poolmgr.c
+++ b/src/backend/pgxc/pool/poolmgr.c
@@ -104,6 +104,7 @@ static DatabasePool *find_database_pool_to_clean(const char *database,
List *co_list);
static DatabasePool *remove_database_pool(const char *database, const char *user_name);
static int *agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
+static int cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist);
static PGXCNodePoolSlot *acquire_connection(DatabasePool *dbPool, int node, char client_conn_type);
static void agent_release_connections(PoolAgent *agent, List *dn_discard, List *co_discard);
static void agent_reset_params(PoolAgent *agent, List *dn_list, List *co_list);
@@ -878,17 +879,17 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
*/
for (;;)
{
- const char *database = NULL;
- const char *user_name = NULL;
- const char *set_command;
+ const char *database = NULL;
+ const char *user_name = NULL;
+ const char *set_command;
bool is_local;
- int datanodecount;
- int coordcount;
- List *datanodelist = NIL;
- List *coordlist = NIL;
- int *fds;
- int *pids;
- int i, len, res;
+ int datanodecount;
+ int coordcount;
+ List *datanodelist = NIL;
+ List *coordlist = NIL;
+ int *fds;
+ int *pids;
+ int i, len, res;
/*
* During a pool cleaning, Abort, Connect and Get Connections messages
@@ -1001,6 +1002,32 @@ agent_handle_input(PoolAgent * agent, StringInfo s)
if (fds)
pfree(fds);
break;
+
+ case 'h': /* Cancel SQL Command in progress on specified connections */
+ /*
+ * Length of message is caused by:
+ * - Message header = 4bytes
+ * - List of datanodes = NumDataNodes * 4bytes (max)
+ * - List of coordinators = NumCoords * 4bytes (max)
+ * - Number of Datanodes sent = 4bytes
+ * - Number of Coordinators sent = 4bytes
+ */
+ pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12);
+ datanodecount = pq_getmsgint(s, 4);
+ for (i = 0; i < datanodecount; i++)
+ datanodelist = lappend_int(datanodelist, pq_getmsgint(s, 4));
+ coordcount = pq_getmsgint(s, 4);
+ /* It is possible that no Coordinators are involved in the transaction */
+ for (i = 0; i < coordcount; i++)
+ coordlist = lappend_int(coordlist, pq_getmsgint(s, 4));
+ pq_getmsgend(s);
+
+ cancel_query_on_connections(agent, datanodelist, coordlist);
+ list_free(datanodelist);
+ list_free(coordlist);
+
+ break;
+
case 'r': /* RELEASE CONNECTIONS */
pool_getmessage(&agent->port, s, 4 * NumDataNodes + 4 * NumCoords + 12);
datanodecount = pq_getmsgint(s, 4);
@@ -1245,6 +1272,61 @@ agent_acquire_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
return result;
}
+/*
+ * Cancel query
+ */
+static int
+cancel_query_on_connections(PoolAgent *agent, List *datanodelist, List *coordlist)
+{
+ int i;
+ ListCell *nodelist_item;
+ char errbuf[256];
+ int nCount;
+ bool bRet;
+
+ nCount = 0;
+
+ if (agent == NULL)
+ return nCount;
+
+ /* Send cancel on Data nodes first */
+ foreach(nodelist_item, datanodelist)
+ {
+ int node = lfirst_int(nodelist_item);
+
+ if(node <= 0 || node > NumDataNodes)
+ continue;
+
+ if (agent->dn_connections == NULL)
+ break;
+
+ bRet = PQcancel((PGcancel *) agent->dn_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf));
+ if (bRet != false)
+ {
+ nCount++;
+ }
+ }
+
+ /* Send cancel to Coordinators too, e.g. if DDL was in progress */
+ foreach(nodelist_item, coordlist)
+ {
+ int node = lfirst_int(nodelist_item);
+
+ if(node <= 0 || node > NumDataNodes)
+ continue;
+
+ if (agent->coord_connections == NULL)
+ break;
+
+ bRet = PQcancel((PGcancel *) agent->coord_connections[node - 1]->xc_cancelConn, errbuf, sizeof(errbuf));
+ if (bRet != false)
+ {
+ nCount++;
+ }
+ }
+
+ return nCount;
+}
/*
* Return connections back to the pool
@@ -1262,6 +1344,9 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int*
Assert(Handle);
+ if (dn_ndisc == 0 && co_ndisc == 0)
+ return;
+
/* Insert the list of Datanodes in buffer */
n32 = htonl((uint32) dn_ndisc);
buf[0] = n32;
@@ -1290,6 +1375,52 @@ PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int*
pool_flush(&Handle->port);
}
+/*
+ * Cancel Query
+ */
+void
+PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list)
+{
+ uint32 n32;
+ /*
+ * Buffer contains the list of both Coordinator and Datanodes, as well
+ * as the number of connections
+ */
+ uint32 buf[2 + dn_count + co_count];
+ int i;
+
+ if (Handle == NULL || dn_list == NULL || co_list == NULL)
+ return;
+
+ if (dn_count == 0 && co_count == 0)
+ return;
+
+ /* Insert the list of Datanodes in buffer */
+ n32 = htonl((uint32) dn_count);
+ buf[0] = n32;
+
+ for (i = 0; i < dn_count;)
+ {
+ n32 = htonl((uint32) dn_list[i++]);
+ buf[i] = n32;
+ }
+
+ /* Insert the list of Coordinators in buffer */
+ n32 = htonl((uint32) co_count);
+ buf[dn_count + 1] = n32;
+
+ /* Not necessary to send to pooler a request if there is no Coordinator */
+ if (co_count != 0)
+ {
+ for (i = dn_count + 1; i < (dn_count + co_count + 1);)
+ {
+ n32 = htonl((uint32) co_list[i - (dn_count + 1)]);
+ buf[++i] = n32;
+ }
+ }
+ pool_putmessage(&Handle->port, 'h', (char *) buf, (2 + dn_count + co_count) * sizeof(uint32));
+ pool_flush(&Handle->port);
+}
/*
* Release connections for Datanodes and Coordinators
@@ -1950,6 +2081,8 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type)
break;
}
+ slot->xc_cancelConn = PQgetCancel(slot->conn);
+
/* Insert at the end of the pool */
nodePool->slot[(nodePool->freeSize)++] = slot;
@@ -1968,6 +2101,7 @@ grow_pool(DatabasePool * dbPool, int index, char client_conn_type)
static void
destroy_slot(PGXCNodePoolSlot *slot)
{
+ PQfreeCancel(slot->xc_cancelConn);
PGXCNodeClose(slot->conn);
pfree(slot);
}
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index b2fab359b8..60e9cac802 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -71,6 +71,9 @@
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
#undef _
@@ -221,6 +224,13 @@ errstart(int elevel, const char *filename, int lineno,
*/
if (elevel >= ERROR)
{
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ cancel_query();
+ clear_all_data();
+ }
+#endif
/*
* If we are inside a critical section, all errors become PANIC
* errors. See miscadmin.h.
@@ -1121,6 +1131,14 @@ elog_finish(int elevel, const char *fmt,...)
CHECK_STACK_DEPTH();
+#ifdef PGXC
+ if (elevel >= ERROR && IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ cancel_query();
+ clear_all_data();
+ }
+#endif
+
/*
* Do errstart() to see if we actually want to report the message.
*/
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7cdb0f6318..d864470d7c 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -234,5 +234,4 @@ extern int xactGetCommittedChildren(TransactionId **ptr);
extern void xact_redo(XLogRecPtr lsn, XLogRecord *record);
extern void xact_desc(StringInfo buf, uint8 xl_info, char *rec);
-
#endif /* XACT_H */
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index c2fe88422c..48d23ca399 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -141,6 +141,7 @@ extern PGXCNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Sna
extern int DataNodeCopyIn(char *data_row, int len, ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections);
extern uint64 DataNodeCopyOut(ExecNodes *exec_nodes, PGXCNodeHandle** copy_connections, FILE* copy_file);
extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int primary_data_node, CombineType combine_type);
+extern bool DataNodeCopyEnd(PGXCNodeHandle *handle, bool is_error);
extern int DataNodeCopyInBinaryForAll(char *msg_buf, int len, PGXCNodeHandle** copy_connections);
extern int ExecCountSlotsRemoteQuery(RemoteQuery *node);
@@ -150,10 +151,8 @@ extern void ExecEndRemoteQuery(RemoteQueryState *step);
extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
-#ifdef PGXC
-extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body,
- size_t len);
-#endif
+extern bool is_data_node_ready(PGXCNodeHandle * conn);
+extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, size_t len);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
extern void BufferConnection(PGXCNodeHandle *conn);
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index 8f1eb54f1a..4b66a753e3 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -28,6 +28,7 @@
/* Connection to data node maintained by Pool Manager */
typedef struct PGconn NODE_CONNECTION;
+typedef struct PGcancel NODE_CANCEL;
/* Helper structure to access data node from Session */
typedef enum
@@ -105,6 +106,9 @@ extern void PGXCNodeCleanAndRelease(int code, Datum arg);
extern PGXCNodeAllHandles *get_handles(List *datanodelist, List *coordlist, bool is_query_coord_only);
extern void release_handles(void);
+extern void cancel_query(void);
+extern void clear_all_data(void);
+
extern int get_transaction_nodes(PGXCNodeHandle ** connections,
char client_conn_type,
@@ -130,11 +134,14 @@ extern int pgxc_node_send_gxid(PGXCNodeHandle * handle, GlobalTransactionId gxid
extern int pgxc_node_send_snapshot(PGXCNodeHandle * handle, Snapshot snapshot);
extern int pgxc_node_send_timestamp(PGXCNodeHandle * handle, TimestampTz timestamp);
-extern int pgxc_node_receive(const int conn_count,
+extern bool pgxc_node_receive(const int conn_count,
PGXCNodeHandle ** connections, struct timeval * timeout);
-extern int pgxc_node_read_data(PGXCNodeHandle * conn);
+extern int pgxc_node_read_data(PGXCNodeHandle * conn, bool close_if_error);
+extern int pgxc_node_is_data_enqueued(PGXCNodeHandle *conn);
+
extern int send_some(PGXCNodeHandle * handle, int len);
extern int pgxc_node_flush(PGXCNodeHandle *handle);
+extern void pgxc_node_flush_read(PGXCNodeHandle *handle);
extern int pgxc_all_handles_send_gxid(PGXCNodeAllHandles *pgxc_handles, GlobalTransactionId gxid, bool stop_at_error);
extern int pgxc_all_handles_send_query(PGXCNodeAllHandles *pgxc_handles, const char *buffer, bool stop_at_error);
diff --git a/src/include/pgxc/poolmgr.h b/src/include/pgxc/poolmgr.h
index 4de9e4a339..79397687da 100644
--- a/src/include/pgxc/poolmgr.h
+++ b/src/include/pgxc/poolmgr.h
@@ -35,6 +35,7 @@ typedef struct
{
struct timeval released;
NODE_CONNECTION *conn;
+ NODE_CANCEL *xc_cancelConn;
} PGXCNodePoolSlot;
/* Pool of connections to specified pgxc node */
@@ -149,4 +150,7 @@ extern int PoolManagerAbortTransactions(char *dbname, char *username, int **proc
/* Return connections back to the pool, for both Coordinator and Datanode connections */
extern void PoolManagerReleaseConnections(int dn_ndisc, int* dn_discard, int co_ndisc, int* co_discard);
+/* Cancel a running query on data nodes as well as on other coordinators */
+extern void PoolManagerCancelQuery(int dn_count, int* dn_list, int co_count, int* co_list);
+
#endif
diff --git a/src/test/regress/expected/domain_1.out b/src/test/regress/expected/domain_1.out
index 07808af958..02f1556462 100644
--- a/src/test/regress/expected/domain_1.out
+++ b/src/test/regress/expected/domain_1.out
@@ -48,8 +48,7 @@ ERROR: value too long for type character varying(5)
INSERT INTO basictest values ('88', 'haha', 'short', '123.1212'); -- Truncate numeric
-- Test copy
COPY basictest (testvarchar) FROM stdin; -- fail
-ERROR: value too long for type character varying(5)
-CONTEXT: COPY basictest, line 1, column testvarchar: "notsoshorttext"
+ERROR: Error while running COPY
COPY basictest (testvarchar) FROM stdin;
select * from basictest order by 1, 2, 3, 4;
testint4 | testtext | testvarchar | testnumeric
@@ -129,8 +128,7 @@ select testint4arr[1], testchar4arr[2:2] from domarrtest order by 1, 2;
COPY domarrtest FROM stdin;
COPY domarrtest FROM stdin; -- fail
-ERROR: value too long for type character varying(4)
-CONTEXT: COPY domarrtest, line 1, column testchar4arr: "{qwerty,w,e}"
+ERROR: Error while running COPY
select * from domarrtest order by 1, 2;
testint4arr | testchar4arr
---------------+---------------------
@@ -174,8 +172,7 @@ INSERT INTO nulltest values ('a', 'b', 'c', NULL, 'd'); -- Good
COPY nulltest FROM stdin; --fail
ERROR: Error while running COPY
COPY nulltest FROM stdin; --fail
-ERROR: domain dcheck does not allow null values
-CONTEXT: COPY nulltest, line 1, column col5: null input
+ERROR: Error while running COPY
-- Last row is bad
COPY nulltest FROM stdin;
ERROR: Error while running COPY
diff --git a/src/test/regress/expected/xc_distkey.out b/src/test/regress/expected/xc_distkey.out
index d050b27c33..819952a266 100644
--- a/src/test/regress/expected/xc_distkey.out
+++ b/src/test/regress/expected/xc_distkey.out
@@ -451,15 +451,15 @@ select * from ts_tab order by a;
(2 rows)
select * from ts_tab where a = 'May 10, 2011 00:01:02.03';
- a
-------------------------
- 2011-05-10 00:01:02.03
+ a
+-----------------------------
+ Tue May 10 00:01:02.03 2011
(1 row)
select * from ts_tab where a = 'August 14, 2001 23:59:59.99';
- a
-------------------------
- 2001-08-14 23:59:59.99
+ a
+-----------------------------
+ Tue Aug 14 23:59:59.99 2001
(1 row)
create table in_tab(a interval) distribute by modulo(a);
@@ -517,15 +517,15 @@ select * from atim_tab order by a;
(2 rows)
select * from atim_tab where a = abstime('May 10, 2011 00:01:02.03');
- a
-------------------------
- 2011-05-10 12:01:02+05
+ a
+------------------------------
+ Tue May 10 00:01:02 2011 PDT
(1 row)
select * from atim_tab where a = abstime('Jun 23, 2001 23:59:59.99');
- a
-------------------------
- 2001-06-24 11:59:59+05
+ a
+------------------------------
+ Sat Jun 23 23:59:59 2001 PDT
(1 row)
create table rtim_tab(a reltime) distribute by modulo(a);
@@ -563,13 +563,13 @@ select * from date_tab order by a;
select * from date_tab where a = 'May 10, 2011';
a
------------
- 2011-05-10
+ 05-10-2011
(1 row)
select * from date_tab where a = 'August 23, 2001';
a
------------
- 2001-08-23
+ 08-23-2001
(1 row)
create table tstz_tab(a timestamp with time zone) distribute by modulo(a);
@@ -583,15 +583,15 @@ select * from tstz_tab order by a;
(2 rows)
select * from tstz_tab where a = 'May 10, 2011 00:01:02.03 PST';
- a
----------------------------
- 2011-05-10 13:01:02.03+05
+ a
+---------------------------------
+ Tue May 10 01:01:02.03 2011 PDT
(1 row)
select * from tstz_tab where a = 'Jun 23, 2001 23:59:59.99 PST';
- a
----------------------------
- 2001-06-24 12:59:59.99+05
+ a
+---------------------------------
+ Sun Jun 24 00:59:59.99 2001 PDT
(1 row)
create table tstz_tab_h(a timestamp with time zone) distribute by hash(a);
@@ -605,14 +605,14 @@ select * from tstz_tab_h order by a;
(2 rows)
select * from tstz_tab_h where a = 'May 10, 2011 00:01:02.03 PST';
- a
----------------------------
- 2011-05-10 13:01:02.03+05
+ a
+---------------------------------
+ Tue May 10 01:01:02.03 2011 PDT
(1 row)
select * from tstz_tab_h where a = 'Jun 23, 2001 23:59:59.99 PST';
- a
----------------------------
- 2001-06-24 12:59:59.99+05
+ a
+---------------------------------
+ Sun Jun 24 00:59:59.99 2001 PDT
(1 row)
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 658f930e22..6b58aa78d0 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -42,7 +42,7 @@ test: comments
test: geometry
#After supporting other data types as distribution key, this test case crashes the server
#Bug ID 3306801 tracks this crash
-#test: horology
+test: horology
test: oidjoins
test: type_sanity
test: opr_sanity