summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMason S2010-04-18 04:43:57 +0000
committerPavan Deolasee2011-05-19 16:38:45 +0000
commitf54ce7729b567f5a3d6f8183f8c09185adf337ab (patch)
treea1755765e411e1ddebe55646240d3b1ad713c426
parent8f4735f6b2b1676a49af3b1c184475831853a5a3 (diff)
Added support for COPY FROM, for loading tables.
Some additional work was done related to the combiner and error handling to make this code a little cleaner. This was written by Andrei Martsinchyk.
-rw-r--r--src/backend/commands/copy.c330
-rw-r--r--src/backend/pgxc/locator/locator.c1
-rw-r--r--src/backend/pgxc/plan/planner.c61
-rw-r--r--src/backend/pgxc/pool/combiner.c103
-rw-r--r--src/backend/pgxc/pool/datanode.c749
-rw-r--r--src/backend/tcop/postgres.c91
-rw-r--r--src/include/pgxc/combiner.h4
-rw-r--r--src/include/pgxc/datanode.h16
-rw-r--r--src/include/pgxc/locator.h1
-rw-r--r--src/include/pgxc/planner.h2
10 files changed, 1107 insertions, 251 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index c464ed7f6c..35622074f5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,12 @@
#include "miscadmin.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/datanode.h"
+#include "pgxc/locator.h"
+#include "pgxc/poolmgr.h"
+#endif
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
@@ -160,6 +166,22 @@ typedef struct CopyStateData
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
+#ifdef PGXC
+ /*
+ * On coordinator we need to rewrite query.
+ * While client may submit a copy command dealing with file, data nodes
+ * always send/receive data to/from the coordinator. So we can not use
+ * original statement and should rewrite statement, specifing STDIN/STDOUT
+ * as copy source or destination
+ */
+ StringInfoData query_buf;
+
+ /* Locator information */
+ RelationLocInfo *rel_loc; /* the locator key */
+ int hash_idx; /* index of the hash column */
+
+ DataNodeHandle **connections; /* Involved data node connections */
+#endif
} CopyStateData;
typedef CopyStateData *CopyState;
@@ -646,7 +668,6 @@ CopyGetInt16(CopyState cstate, int16 *val)
return true;
}
-
/*
* CopyLoadRawBuf loads some more data into raw_buf
*
@@ -682,6 +703,101 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
+#ifdef PGXC
+/*
+ * CopyQuoteStr appends quoted value to the query buffer. Value is escaped
+ * if needed
+ *
+ * When rewriting query to be sent down to nodes we should escape special
+ * characters, that may present in the value. The characters are backslash(\)
+ * and single quote ('). These characters are escaped by doubling. We do not
+ * have to escape characters like \t, \v, \b, etc. because datanode interprets
+ * them properly.
+ * We use E'...' syntax for litherals containing backslashes.
+ */
+static void
+CopyQuoteStr(StringInfo query_buf, char *value)
+{
+ char *start = value;
+ char *current = value;
+ char c;
+ bool has_backslash = (strchr(value, '\\') != NULL);
+
+ if (has_backslash)
+ appendStringInfoChar(query_buf, 'E');
+
+ appendStringInfoChar(query_buf, '\'');
+
+#define APPENDSOFAR \
+ if (current > start) \
+ appendBinaryStringInfo(query_buf, start, current - start)
+
+ while ((c = *current) != '\0')
+ {
+ switch (c)
+ {
+ case '\\':
+ case '\'':
+ APPENDSOFAR;
+ // Double current
+ appendStringInfoChar(query_buf, c);
+ // Second current will be appended next time
+ start = current;
+ // fallthru
+ default:
+ current++;
+ }
+ }
+ APPENDSOFAR;
+ appendStringInfoChar(query_buf, '\'');
+}
+
+/*
+ * CopyQuoteIdentifier determine if identifier needs to be quoted and surround
+ * it with double quotes
+ */
+static void
+CopyQuoteIdentifier(StringInfo query_buf, char *value)
+{
+ char *start = value;
+ char *current = value;
+ char c;
+ int len = strlen(value);
+ bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len)
+ || (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL);
+
+ if (need_quote)
+ {
+ appendStringInfoChar(query_buf, '"');
+
+#define APPENDSOFAR \
+ if (current > start) \
+ appendBinaryStringInfo(query_buf, start, current - start)
+
+ while ((c = *current) != '\0')
+ {
+ switch (c)
+ {
+ case '"':
+ APPENDSOFAR;
+ // Double current
+ appendStringInfoChar(query_buf, c);
+ // Second current will be appended next time
+ start = current;
+ // fallthru
+ default:
+ current++;
+ }
+ }
+ APPENDSOFAR;
+ appendStringInfoChar(query_buf, '"');
+ }
+ else
+ {
+ appendBinaryStringInfo(query_buf, value, len);
+ }
+}
+#endif
/*
* DoCopy executes the SQL COPY statement
@@ -1012,6 +1128,133 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
+#ifdef PGXC
+ /* Get locator information */
+ if (IS_PGXC_COORDINATOR)
+ {
+ char *hash_att;
+
+ cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
+ hash_att = GetRelationHashColumn(cstate->rel_loc);
+
+ cstate->hash_idx = -1;
+ if (hash_att)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+ if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0)
+ {
+ cstate->hash_idx = attnum - 1;
+ break;
+ }
+ }
+ }
+
+ /*
+ * Build up query string for the data nodes, it should match
+ * to original string, but should have STDIN/STDOUT instead
+ * of filename.
+ */
+ initStringInfo(&cstate->query_buf);
+
+ appendStringInfoString(&cstate->query_buf, "COPY ");
+ appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel));
+
+ if (attnamelist)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " (");
+ foreach (cell, attnamelist)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ appendStringInfoChar(&cstate->query_buf, ')');
+ }
+
+ if (stmt->is_from)
+ appendStringInfoString(&cstate->query_buf, " FROM STDIN");
+ else
+ appendStringInfoString(&cstate->query_buf, " TO STDOUT");
+
+
+ if (cstate->binary)
+ appendStringInfoString(&cstate->query_buf, " BINARY");
+
+ if (cstate->oids)
+ appendStringInfoString(&cstate->query_buf, " OIDS");
+
+ if (cstate->delim)
+ if ((!cstate->csv_mode && cstate->delim[0] != '\t')
+ || (cstate->csv_mode && cstate->delim[0] != ','))
+ {
+ appendStringInfoString(&cstate->query_buf, " DELIMITER AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->delim);
+ }
+
+ if (cstate->null_print)
+ if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N"))
+ || (cstate->csv_mode && strcmp(cstate->null_print, "")))
+ {
+ appendStringInfoString(&cstate->query_buf, " NULL AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->null_print);
+ }
+
+ if (cstate->csv_mode)
+ appendStringInfoString(&cstate->query_buf, " CSV");
+
+ if (cstate->header_line)
+ appendStringInfoString(&cstate->query_buf, " HEADER");
+
+ if (cstate->quote && cstate->quote[0] == '"')
+ {
+ appendStringInfoString(&cstate->query_buf, " QUOTE AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->quote);
+ }
+
+ if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0])
+ {
+ appendStringInfoString(&cstate->query_buf, " ESCAPE AS ");
+ CopyQuoteStr(&cstate->query_buf, cstate->escape);
+ }
+
+ if (force_quote)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " FORCE QUOTE ");
+ foreach (cell, force_quote)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+
+ if (force_notnull)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL ");
+ foreach (cell, force_notnull)
+ {
+ if (prev)
+ appendStringInfoString(&cstate->query_buf, ", ");
+ CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+ }
+#endif
}
else
{
@@ -1023,6 +1266,13 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
Assert(!is_from);
cstate->rel = NULL;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY (SELECT) is not supported in PGXC")));
+#endif
+
/* Don't allow COPY w/ OIDs from a select */
if (cstate->oids)
ereport(ERROR,
@@ -1157,11 +1407,64 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
cstate->copy_dest = COPY_FILE; /* default */
cstate->filename = stmt->filename;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
+ cstate->rel_loc->nodeList,
+ GetActiveSnapshot());
+ if (!cstate->connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Failed to initialize data nodes for COPY")));
+ }
+ PG_TRY();
+ {
+#endif
+
if (is_from)
CopyFrom(cstate); /* copy from file to database */
else
DoCopyTo(cstate); /* copy from database to file */
+#ifdef PGXC
+ }
+ PG_CATCH();
+ {
+ if (IS_PGXC_COORDINATOR)
+ {
+ DataNodeCopyFinish(
+ cstate->connections,
+ primary_data_node,
+ COMBINE_TYPE_NONE,
+ whereToSendOutput);
+ pfree(cstate->connections);
+ pfree(cstate->query_buf.data);
+ FreeRelationLocInfo(cstate->rel_loc);
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ if (IS_PGXC_COORDINATOR)
+ {
+ if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED)
+ cstate->processed = DataNodeCopyFinish(
+ cstate->connections,
+ primary_data_node,
+ COMBINE_TYPE_SAME,
+ whereToSendOutput);
+ else
+ cstate->processed = DataNodeCopyFinish(
+ cstate->connections,
+ 0,
+ COMBINE_TYPE_SUM,
+ whereToSendOutput);
+ pfree(cstate->connections);
+ pfree(cstate->query_buf.data);
+ FreeRelationLocInfo(cstate->rel_loc);
+ }
+#endif
+
/*
* Close the relation or query. If reading, we can release the
* AccessShareLock we got; if writing, we should hold the lock until end
@@ -1178,9 +1481,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
PopActiveSnapshot();
}
- /* Clean up storage (probably not really necessary) */
processed = cstate->processed;
-
+ /* Clean up storage (probably not really necessary) */
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate->raw_buf);
@@ -2089,6 +2391,25 @@ CopyFrom(CopyState cstate)
&nulls[defmap[i]], NULL);
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ Datum *hash_value = NULL;
+
+ if (cstate->hash_idx >= 0 && !nulls[cstate->hash_idx])
+ hash_value = &values[cstate->hash_idx];
+
+ if (DataNodeCopyIn(cstate->line_buf.data,
+ cstate->line_buf.len,
+ GetRelationNodes(cstate->rel_loc, (long *)hash_value, false),
+ cstate->connections))
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Copy failed on a data node")));
+ }
+ else
+ {
+#endif
/* And now we can form the input tuple. */
tuple = heap_form_tuple(tupDesc, values, nulls);
@@ -2142,6 +2463,9 @@ CopyFrom(CopyState cstate)
*/
cstate->processed++;
}
+#ifdef PGXC
+ }
+#endif
}
/* Done, clean up */
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index 1ce8299fce..99ef5e6ee3 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -290,6 +290,7 @@ GetRelationNodes(RelationLocInfo * rel_loc_info, long *partValue, int isRead)
return NULL;
exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+ exec_nodes->baselocatortype = rel_loc_info->locatorType;
switch (rel_loc_info->locatorType)
{
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 7f04728655..0720b0fc3d 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -4,7 +4,7 @@
*
* Functions for generating a PGXC style plan.
*
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
*
*
@@ -38,7 +38,7 @@ typedef struct
} Literal_Comparison;
/*
- * This struct helps us detect special conditions to determine what nodes
+ * This struct helps us detect special conditions to determine what nodes
* to execute on.
*/
typedef struct
@@ -84,8 +84,8 @@ bool StrictStatementChecking = true;
/* Forbid multi-node SELECT statements with an ORDER BY clause */
bool StrictSelectChecking = false;
-/*
- * Create a new join struct for tracking how relations are joined
+/*
+ * Create a new join struct for tracking how relations are joined
*/
static PGXC_Join *
new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
@@ -113,8 +113,8 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
}
-/*
- * Look up the join struct for a particular join
+/*
+ * Look up the join struct for a particular join
*/
static PGXC_Join *
find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2)
@@ -220,7 +220,7 @@ free_join_list()
/*
* get_numeric_constant - extract casted constant
*
- * Searches an expression to see if it is a Constant that is being cast
+ * Searches an expression to see if it is a Constant that is being cast
* to numeric. Return a pointer to the Constant, or NULL.
* We need this because of casting.
*/
@@ -297,7 +297,7 @@ get_plan_nodes_insert(Query * query)
long *part_value_ptr = NULL;
-
+
/* Looks complex (correlated?) - best to skip */
if (query->jointree != NULL && query->jointree->fromlist != NULL)
@@ -426,8 +426,8 @@ examine_conditions(Special_Conditions * conditions, List *rtables, Node *expr_no
else if (boolexpr->boolop == OR_EXPR)
{
/*
- * look at OR's as work-around for reported issue.
- * NOTE: THIS IS NOT CORRECT, BUT JUST DONE FOR THE PROTOTYPE.
+ * look at OR's as work-around for reported issue.
+ * NOTE: THIS IS NOT CORRECT, BUT JUST DONE FOR THE PROTOTYPE.
* More rigorous
* checking needs to be done. PGXCTODO: Add careful checking for
* OR'ed conditions...
@@ -832,7 +832,7 @@ get_plan_nodes(Query_Plan * query_plan, Query * query, bool isRead)
exec_nodes = test_exec_nodes;
else
{
- if ((exec_nodes && list_length(exec_nodes->nodelist) > 1)
+ if ((exec_nodes && list_length(exec_nodes->nodelist) > 1)
|| (test_exec_nodes && list_length(test_exec_nodes->nodelist) > 1))
/* there should only be one */
exec_nodes = NULL;
@@ -872,7 +872,7 @@ get_plan_nodes(Query_Plan * query_plan, Query * query, bool isRead)
}
-/*
+/*
* get_plan_nodes - determine the nodes to execute the plan on
*
* return NULL if it is not safe to be done in a single step.
@@ -904,6 +904,35 @@ get_plan_nodes_command(Query_Plan * query_plan, Query * query)
/*
+ * get_plan_combine_type - determine combine type
+ *
+ * COMBINE_TYPE_SAME - for replicated updates
+ * COMBINE_TYPE_SUM - for hash and round robin updates
+ * COMBINE_TYPE_NONE - for operations where row_count is not applicable
+ *
+ * return NULL if it is not safe to be done in a single step.
+ */
+static CombineType
+get_plan_combine_type(Query *query, char baselocatortype)
+{
+
+ switch (query->commandType)
+ {
+ case CMD_INSERT:
+ case CMD_UPDATE:
+ case CMD_DELETE:
+ return baselocatortype == LOCATOR_TYPE_REPLICATED ?
+ COMBINE_TYPE_SAME : COMBINE_TYPE_SUM;
+
+ default:
+ return COMBINE_TYPE_NONE;
+ }
+ /* quiet compiler warning */
+ return COMBINE_TYPE_NONE;
+}
+
+
+/*
* Get list of simple aggregates used.
* For now we only allow MAX in the first column, and return a list of one.
*/
@@ -972,6 +1001,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1);
strcpy(query_step->sql_statement, sql_statement);
query_step->exec_nodes = NULL;
+ query_step->combine_type = COMBINE_TYPE_NONE;
query_step->simple_aggregates = NULL;
query_plan->query_step_list = lappend(NULL, query_step);
@@ -991,6 +1021,9 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
query = (Query *) linitial(querytree_list);
query_step->exec_nodes =
get_plan_nodes_command(query_plan, query);
+ if (query_step->exec_nodes)
+ query_step->combine_type = get_plan_combine_type(
+ query, query_step->exec_nodes->baselocatortype);
query_step->simple_aggregates =
get_simple_aggregates(query, query_step->exec_nodes);
@@ -1065,7 +1098,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list)
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
- if (StrictStatementChecking && query_step->exec_nodes
+ if (StrictStatementChecking && query_step->exec_nodes
&& list_length(query_step->exec_nodes->nodelist) > 1)
{
/*
@@ -1264,7 +1297,7 @@ free_query_step(Query_Step * query_step)
return;
pfree(query_step->sql_statement);
- if (query_step->exec_nodes)
+ if (query_step->exec_nodes)
{
if (query_step->exec_nodes->nodelist)
list_free(query_step->exec_nodes->nodelist);
diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c
index ed063bc03b..ddb9482f97 100644
--- a/src/backend/pgxc/pool/combiner.c
+++ b/src/backend/pgxc/pool/combiner.c
@@ -48,6 +48,9 @@ CreateResponseCombiner(int node_count, CombineType combine_type,
combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->description_count = 0;
+ combiner->copy_in_count = 0;
+ combiner->copy_out_count = 0;
+ combiner->inErrorState = false;
combiner->simple_aggregates = NULL;
return combiner;
@@ -60,14 +63,22 @@ static int
parse_row_count(const char *message, size_t len, int *rowcount)
{
int digits = 0;
+ int pos;
*rowcount = 0;
/* skip \0 string terminator */
- len--;
- while (len-- > 0 && message[len] >= '0' && message[len] <= '9')
+ for (pos = 0; pos < len - 1; pos++)
{
- *rowcount = *rowcount * 10 + message[len] - '0';
- digits++;
+ if (message[pos] >= '0' && message[pos] <= '9')
+ {
+ *rowcount = *rowcount * 10 + message[pos] - '0';
+ digits++;
+ }
+ else
+ {
+ *rowcount = 0;
+ digits = 0;
+ }
}
return digits;
}
@@ -149,8 +160,11 @@ process_aggregate_element(List *simple_aggregates, char *msg_body, size_t len)
int
CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len)
{
- int rowcount;
- int digits = 0;
+ int digits = 0;
+
+ /* Ignore anything if we have encountered error */
+ if (combiner->inErrorState)
+ return EOF;
switch (msg_type)
{
@@ -164,6 +178,7 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
/* Extract rowcount */
if (combiner->combine_type != COMBINE_TYPE_NONE)
{
+ int rowcount;
digits = parse_row_count(msg_body, len, &rowcount);
if (digits > 0)
{
@@ -176,13 +191,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
/* There is a consistency issue in the database with the replicated table */
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Write to replicated table returned different results from the data nodes"
- )));
+ errmsg("Write to replicated table returned different results from the data nodes")));
}
else
/* first result */
combiner->row_count = rowcount;
- } else
+ } else
combiner->row_count += rowcount;
}
else
@@ -202,12 +216,9 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
{
char command_complete_buffer[256];
- rowcount = combiner->combine_type == COMBINE_TYPE_SUM ?
- combiner->row_count :
- combiner->row_count / combiner->node_count;
/* Truncate msg_body to get base string */
msg_body[len - digits - 1] = '\0';
- len = sprintf(command_complete_buffer, "%s%d", msg_body, rowcount) + 1;
+ len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1;
pq_putmessage(msg_type, command_complete_buffer, len);
}
}
@@ -219,7 +230,9 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
if (combiner->request_type != REQUEST_TYPE_QUERY)
{
/* Inconsistent responses */
- return EOF;
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
}
/* Proxy first */
if (combiner->description_count++ == 0)
@@ -235,10 +248,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
if (combiner->request_type != REQUEST_TYPE_COPY_IN)
{
/* Inconsistent responses */
- return EOF;
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
}
/* Proxy first */
- if (combiner->description_count++ == 0)
+ if (combiner->copy_in_count++ == 0)
{
if (combiner->dest == DestRemote
|| combiner->dest == DestRemoteExecute)
@@ -251,10 +266,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
{
/* Inconsistent responses */
- return EOF;
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
}
/* Proxy first */
- if (combiner->description_count++ == 0)
+ if (combiner->copy_out_count++ == 0)
{
if (combiner->dest == DestRemote
|| combiner->dest == DestRemoteExecute)
@@ -316,20 +333,23 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
}
break;
case 'E': /* ErrorResponse */
+ combiner->inErrorState = true;
+ /* fallthru */
case 'A': /* NotificationResponse */
case 'N': /* NoticeResponse */
- /* Proxy error message back if specified,
- * or if doing internal primary copy
+ /* Proxy error message back if specified,
+ * or if doing internal primary copy
*/
if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute
- || combiner->combine_type == COMBINE_TYPE_SAME)
+ || combiner->dest == DestRemoteExecute)
pq_putmessage(msg_type, msg_body, len);
break;
case 'I': /* EmptyQuery */
default:
/* Unexpected message */
- return EOF;
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
}
return 0;
}
@@ -341,13 +361,31 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
static bool
validate_combiner(ResponseCombiner combiner)
{
+ /* There was error message while combining */
+ if (combiner->inErrorState)
+ return false;
+ /* Check if state is defined */
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ return false;
/* Check all nodes completed */
- if (combiner->command_complete_count != combiner->node_count)
+ if ((combiner->request_type == REQUEST_TYPE_COMMAND
+ || combiner->request_type == REQUEST_TYPE_QUERY)
+ && combiner->command_complete_count != combiner->node_count)
return false;
/* Check count of description responses */
- if (combiner->request_type != REQUEST_TYPE_COMMAND
- && combiner->description_count != combiner->node_count)
+ if (combiner->request_type == REQUEST_TYPE_QUERY
+ && combiner->description_count != combiner->node_count)
+ return false;
+
+ /* Check count of copy-in responses */
+ if (combiner->request_type == REQUEST_TYPE_COPY_IN
+ && combiner->copy_in_count != combiner->node_count)
+ return false;
+
+ /* Check count of copy-out responses */
+ if (combiner->request_type == REQUEST_TYPE_COPY_OUT
+ && combiner->copy_out_count != combiner->node_count)
return false;
/* Add other checks here as needed */
@@ -381,12 +419,25 @@ ValidateAndResetCombiner(ResponseCombiner combiner)
combiner->row_count = 0;
combiner->request_type = REQUEST_TYPE_NOT_DEFINED;
combiner->description_count = 0;
+ combiner->copy_in_count = 0;
+ combiner->copy_out_count = 0;
+ combiner->inErrorState = false;
combiner->simple_aggregates = NULL;
return valid;
}
/*
+ * Close combiner and free allocated memory, if it is not needed
+ */
+void
+CloseCombiner(ResponseCombiner combiner)
+{
+ if (combiner)
+ pfree(combiner);
+}
+
+/*
* Assign combiner aggregates
*/
void
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 6cc8514641..2b997a528b 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -28,8 +28,10 @@
#include "access/xact.h"
#include "postgres.h"
#include "utils/snapmgr.h"
+#include "pgxc/pgxc.h"
#include "gtm/gtm_c.h"
#include "pgxc/datanode.h"
+#include "pgxc/locator.h"
#include "../interfaces/libpq/libpq-fe.h"
#include "utils/elog.h"
#include "utils/memutils.h"
@@ -50,9 +52,9 @@ static void release_handles(void);
static void data_node_init(DataNodeHandle * handle, int sock);
static void data_node_free(DataNodeHandle * handle);
-static int data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner, GlobalTransactionId gxid);
-static int data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner);
-static int data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner);
+static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid);
+static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest);
+static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest);
static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle);
@@ -64,7 +66,7 @@ static int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
static void add_error_message(DataNodeHandle * handle, const char *message);
static int data_node_read_data(DataNodeHandle * conn);
-static int handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorState);
+static int handle_response(DataNodeHandle * conn, ResponseCombiner combiner);
static int get_int(DataNodeHandle * conn, size_t len, int *out);
static int get_char(DataNodeHandle * conn, char *out);
@@ -220,8 +222,8 @@ InitMultinodeExecutor()
node_count = 0;
}
-/*
- * Builds up a connection string
+/*
+ * Builds up a connection string
*/
char *
DataNodeConnStr(char *host, char *port, char *dbname,
@@ -250,8 +252,8 @@ DataNodeConnStr(char *host, char *port, char *dbname,
}
-/*
- * Connect to a Data Node using a connection string
+/*
+ * Connect to a Data Node using a connection string
*/
NODE_CONNECTION *
DataNodeConnect(char *connstr)
@@ -264,8 +266,8 @@ DataNodeConnect(char *connstr)
}
-/*
- * Close specified connection
+/*
+ * Close specified connection
*/
void
DataNodeClose(NODE_CONNECTION * conn)
@@ -275,8 +277,8 @@ DataNodeClose(NODE_CONNECTION * conn)
}
-/*
- * Checks if connection active
+/*
+ * Checks if connection active
*/
int
DataNodeConnected(NODE_CONNECTION * conn)
@@ -327,17 +329,12 @@ data_node_init(DataNodeHandle * handle, int sock)
/*
- * Handle responses from the Data node connections
+ * Handle responses from the Data node connections
*/
-static int
-data_node_receive_responses(int conn_count, DataNodeHandle ** connections,
+static void
+data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
struct timeval * timeout, ResponseCombiner combiner)
{
- int result = 0;
- int retry_count;
- bool timed_out = false;
- bool inErrorState = false;
-
int count = conn_count;
DataNodeHandle *to_receive[conn_count];
@@ -362,11 +359,8 @@ data_node_receive_responses(int conn_count, DataNodeHandle ** connections,
{
/* note if a connection has error */
if (!to_receive[i]
- || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL
- || to_receive[i]->sock >= 1024)
+ || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL)
{
- result = EOF;
-
/* Handling is done, do not track this connection */
count--;
@@ -391,7 +385,6 @@ data_node_receive_responses(int conn_count, DataNodeHandle ** connections,
if (count == 0)
break;
- retry_count = 0;
retry:
res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
if (res_select < 0)
@@ -409,19 +402,18 @@ retry:
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("select() bad file descriptor set")));
- return EOF;
}
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("select() error: %d", errno)));
- return EOF;
}
if (res_select == 0)
{
/* Handle timeout */
- result = EOF;
- timed_out = true;
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("timeout while waiting for response")));
}
/* read data */
@@ -435,24 +427,16 @@ retry:
if (read_status == EOF || read_status < 0)
{
- count--;
- /* Move last connection in place */
- if (i < count)
- {
- to_receive[i] = to_receive[count];
- /* stay on the current position */
- i--;
- }
-
- inErrorState = true;
- result = EOF;
- continue;
+ /* PGXCTODO - we should notify the pooler to destroy the connections */
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("unexpected EOF on datanode connection")));
}
}
if (conn->inStart < conn->inEnd)
{
- if (handle_response(conn, combiner, inErrorState) == 0
+ if (handle_response(conn, combiner) == 0
|| conn->state == DN_CONNECTION_STATE_ERROR_READY
|| conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
{
@@ -466,24 +450,9 @@ retry:
i--;
}
}
-
- /*
- * See if we flagged an error on connection. Note, if
- * handle_response was not 0 above, an error occurred, we
- * still need to consume the ReadyForQuery message
- */
- if (conn->state == DN_CONNECTION_STATE_ERROR_READY
- || conn->state == DN_CONNECTION_STATE_ERROR_NOT_READY
- || conn->state == DN_CONNECTION_STATE_ERROR_FATAL)
- {
- inErrorState = true;
- result = EOF;
- }
}
}
}
-
- return result;
}
/*
@@ -675,7 +644,7 @@ get_int(DataNodeHandle * conn, size_t len, int *out)
* and closing the connections.
*/
static int
-handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorState)
+handle_response(DataNodeHandle * conn, ResponseCombiner combiner)
{
char msg_type;
int msg_len;
@@ -708,32 +677,45 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorSt
/* no need to parse, just move cursor */
conn->inCursor += msg_len;
conn->state = DN_CONNECTION_STATE_COMPLETED;
- if (!inErrorState)
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
break;
case 'T': /* RowDescription */
- case 'G': /* CopyInResponse */
- case 'H': /* CopyOutResponse */
case 'D': /* DataRow */
/* no need to parse, just move cursor */
conn->inCursor += msg_len;
- if (!inErrorState)
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
+ break;
+ case 'G': /* CopyInResponse */
+ /* no need to parse, just move cursor */
+ conn->inCursor += msg_len;
+ conn->state = DN_CONNECTION_STATE_COPY_IN;
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
+ /* Done, return to caller to let it know the data can be passed in */
+ conn->inStart = conn->inCursor;
+ conn->state = DN_CONNECTION_STATE_COPY_IN;
+ return 0;
+ case 'H': /* CopyOutResponse */
+ /* no need to parse, just move cursor */
+ conn->inCursor += msg_len;
+ conn->state = DN_CONNECTION_STATE_COPY_OUT;
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
break;
case 'E': /* ErrorResponse */
/* no need to parse, just move cursor */
conn->inCursor += msg_len;
- if (!inErrorState)
- CombineResponse(combiner, msg_type,
- conn->inBuffer + conn->inStart + 5,
- conn->inCursor - conn->inStart - 5);
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
conn->inStart = conn->inCursor;
- inErrorState = true;
conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
/*
* Do not return with an error, we still need to consume Z,
@@ -756,31 +738,30 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorSt
{
conn->state = DN_CONNECTION_STATE_ERROR_READY;
return EOF;
- } else
+ } else
conn->state = DN_CONNECTION_STATE_IDLE;
return 0;
case 'I': /* EmptyQuery */
default:
/* sync lost? */
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
- inErrorState = true;
return EOF;
}
conn->inStart = conn->inCursor;
-
}
+ /* Keep compiler quiet */
return EOF;
}
-
/*
* Send BEGIN command to the Data nodes and receive responses
*/
static int
-data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner, GlobalTransactionId gxid)
+data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid)
{
int i;
struct timeval *timeout = NULL;
+ ResponseCombiner combiner;
/* Send BEGIN */
for (i = 0; i < conn_count; i++)
@@ -792,16 +773,15 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner
return EOF;
}
- /* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- return EOF;
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, dest);
- /* Verify status? */
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
- return 0;
+ /* Verify status */
+ return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
}
-
/* Clears the write node list */
static void
clear_write_node_list()
@@ -835,8 +815,6 @@ DataNodeCommit(CommandDest dest)
int res;
int tran_count;
DataNodeHandle *connections[node_count];
- ResponseCombiner combiner;
-
/* Quick check to make sure we have connections */
if (node_count == 0)
goto finish;
@@ -851,11 +829,7 @@ DataNodeCommit(CommandDest dest)
if (tran_count == 0)
goto finish;
- combiner = CreateResponseCombiner(tran_count,
- COMBINE_TYPE_NONE, dest);
- res = data_node_commit(tran_count, connections, combiner);
- if (!ValidateAndCloseCombiner(combiner) || res)
- return EOF;
+ res = data_node_commit(tran_count, connections, dest);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -865,7 +839,7 @@ finish:
release_handles();
autocommit = true;
clear_write_node_list();
- return 0;
+ return res;
}
@@ -873,13 +847,14 @@ finish:
* Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses
*/
static int
-data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner)
+data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest)
{
int i;
struct timeval *timeout = NULL;
char buffer[256];
GlobalTransactionId gxid = InvalidGlobalTransactionId;
int result = 0;
+ ResponseCombiner combiner = NULL;
/* can set this to false to disable temporarily */
@@ -904,9 +879,7 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner
* gxid for it. Hence GetCurrentGlobalTransactionId() just returns
* already allocated gxid
*/
-/* #ifdef PGXC_COORD */
gxid = GetCurrentGlobalTransactionId();
-/* #endif */
sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid);
/* Send PREPARE */
@@ -916,20 +889,26 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner
return EOF;
}
+ combiner = CreateResponseCombiner(conn_count,
+ COMBINE_TYPE_NONE, dest);
/* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- return EOF;
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
/* Reset combiner */
if (!ValidateAndResetCombiner(combiner))
- return EOF;
+ {
+ result = EOF;
+ }
}
if (!do2PC)
strcpy(buffer, "COMMIT");
else
{
- sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
+ if (result)
+ sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid);
+ else
+ sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
/* We need to use a new xid, the data nodes have reset */
two_phase_xid = BeginTranGTM();
@@ -954,9 +933,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner
}
}
+ if (!combiner)
+ combiner = CreateResponseCombiner(conn_count,
+ COMBINE_TYPE_NONE, dest);
/* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- result = EOF;
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+ result = ValidateAndCloseCombiner(combiner) ? result : EOF;
finish:
if (do2PC)
@@ -975,8 +957,6 @@ DataNodeRollback(CommandDest dest)
int res = 0;
int tran_count;
DataNodeHandle *connections[node_count];
- ResponseCombiner combiner;
- int i;
/* Quick check to make sure we have connections */
if (node_count == 0)
@@ -992,19 +972,7 @@ DataNodeRollback(CommandDest dest)
if (tran_count == 0)
goto finish;
- combiner = CreateResponseCombiner(tran_count,
- COMBINE_TYPE_NONE, dest);
- res = data_node_rollback(tran_count, connections, combiner);
-
- /* Assume connection got cleaned up. Reset so we can reuse without error. */
- for (i = 0; i < tran_count; i++)
- {
- connections[i]->transaction_status = 'I';
- connections[i]->state = DN_CONNECTION_STATE_IDLE;
- }
-
- if (!ValidateAndCloseCombiner(combiner) || res)
- res = EOF;
+ res = data_node_rollback(tran_count, connections, dest);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1044,11 +1012,12 @@ release_handles(void)
* Send ROLLBACK command down to the Data nodes and handle responses
*/
static int
-data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner)
+data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest)
{
int i;
struct timeval *timeout = NULL;
int result = 0;
+ ResponseCombiner combiner;
/* Send ROLLBACK - */
for (i = 0; i < conn_count; i++)
@@ -1057,12 +1026,13 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombin
result = EOF;
}
+ combiner = CreateResponseCombiner(conn_count,
+ COMBINE_TYPE_NONE, dest);
/* Receive responses */
- if (data_node_receive_responses(conn_count, connections, timeout, combiner))
- return EOF;
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
- /* Verify status? */
- return 0;
+ /* Verify status */
+ return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
}
@@ -1080,19 +1050,17 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombin
* bool is_read_only - if this is a read-only query
*/
int
-DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapshot snapshot,
- bool force_autocommit, List *simple_aggregates, bool is_read_only)
+DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type,
+ CommandDest dest, Snapshot snapshot, bool force_autocommit,
+ List *simple_aggregates, bool is_read_only)
{
int i;
int j;
int regular_conn_count;
int total_conn_count;
struct timeval *timeout = NULL; /* wait forever */
- ResponseCombiner combiner;
- ResponseCombiner primary_combiner = NULL;
- int primary_row_count = 0;
+ ResponseCombiner combiner = NULL;
int row_count = 0;
- int res;
int new_count = 0;
bool need_tran;
bool found;
@@ -1103,8 +1071,6 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
List *nodelist = NIL;
List *primarynode = NIL;
/* add up affected row count by default, override for replicated writes */
- CombineType combine_type = COMBINE_TYPE_SUM;
-
if (exec_nodes)
{
@@ -1212,11 +1178,8 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
}
if (new_count > 0 && need_tran)
{
- combiner = CreateResponseCombiner(new_count, COMBINE_TYPE_NONE, DestNone);
-
/* Start transaction on connections where it is not started */
- res = data_node_begin(new_count, new_connections, combiner, gxid);
- if (!ValidateAndCloseCombiner(combiner) || res)
+ if (data_node_begin(new_count, new_connections, DestNone, gxid))
{
pfree(connections);
return EOF;
@@ -1230,16 +1193,16 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid))
{
add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
+ if (connections)
pfree(connections);
- if (primaryconnection)
+ if (primaryconnection)
pfree(primaryconnection);
return EOF;
}
if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
{
add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
+ if (connections)
pfree(connections);
pfree(primaryconnection);
return EOF;
@@ -1247,22 +1210,31 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
if (data_node_send_query(primaryconnection[0], query) != 0)
{
add_error_message(primaryconnection[0], "Can not send request");
- if (connections)
+ if (connections)
pfree(connections);
- if (primaryconnection)
+ if (primaryconnection)
pfree(primaryconnection);
return EOF;
}
- /* Use DestNone so that a response will not be sent until run on secondary nodes */
- /* Use COMBINE_TYPE_SAME so that we get the affected row count */
- primary_combiner = CreateResponseCombiner(1, COMBINE_TYPE_SAME, DestNone);
+ Assert(combine_type == COMBINE_TYPE_SAME);
+
+ /*
+ * Create combiner.
+ * Note that we use the same combiner later with the secondary nodes,
+ * so that we do not prematurely send a response to the client
+ * until all nodes have completed execution.
+ */
+ combiner = CreateResponseCombiner(total_conn_count, combine_type, dest);
+ AssignCombinerAggregates(combiner, simple_aggregates);
/* Receive responses */
- res = data_node_receive_responses(1, primaryconnection, timeout, primary_combiner);
- primary_row_count = primary_combiner->row_count;
- if (!ValidateAndCloseCombiner(primary_combiner) || res)
+ data_node_receive_responses(1, primaryconnection, timeout, combiner);
+ /* If we got an error response return immediately */
+ if (DN_CONNECTION_STATE_ERROR(primaryconnection[0]))
{
+ /* We are going to exit, so release combiner */
+ CloseCombiner(combiner);
if (autocommit)
{
if (need_tran)
@@ -1271,18 +1243,12 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
release_handles();
}
- if (primaryconnection)
+ if (primaryconnection)
pfree(primaryconnection);
- if (connections)
+ if (connections)
pfree(connections);
return EOF;
}
- /*
- * If we get here, the statement has executed successfully on the primary data node
- * and it is ok to try and execute on the other data nodes.
- * Set combine_type for secondary so that we do not sum up row counts.
- */
- combine_type = COMBINE_TYPE_SAME;
}
/* Send query to nodes */
@@ -1309,16 +1275,19 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
}
}
- combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest);
- AssignCombinerAggregates(combiner, simple_aggregates);
+ /* We may already have combiner if it is replicated case with primary data node */
+ if (!combiner)
+ {
+ combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest);
+ AssignCombinerAggregates(combiner, simple_aggregates);
+ }
/* Receive responses */
- res = data_node_receive_responses(regular_conn_count, connections, timeout, combiner);
+ data_node_receive_responses(regular_conn_count, connections, timeout, combiner);
row_count = combiner->row_count;
/* Check for errors and if primary nodeMake sure primary and secondary nodes were updated the same */
- if (!ValidateAndCloseCombiner(combiner) || res
- || (primaryconnection && row_count != primary_row_count))
+ if (!ValidateAndCloseCombiner(combiner))
{
if (autocommit)
{
@@ -1330,12 +1299,6 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh
pfree(connections);
- if (primaryconnection && row_count != primary_row_count)
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("Write to replicated table returned different results between primary and secondary data nodes. Primary: %d, secondary: %d", primary_row_count, row_count
- )));
-
return EOF;
}
@@ -1558,6 +1521,24 @@ send_some(DataNodeHandle * handle, int len)
/*
+ * This method won't return until connection buffer is empty or error occurs
+ * To ensure all data are on the wire before waiting for response
+ */
+static int
+data_node_flush(DataNodeHandle *handle)
+{
+ while (handle->outEnd)
+ {
+ if (send_some(handle, handle->outEnd) < 0)
+ {
+ add_error_message(handle, "failed to send data to datanode");
+ return EOF;
+ }
+ }
+ return 0;
+}
+
+/*
* Send specified statement down to the Data node
*/
static int
@@ -1582,13 +1563,9 @@ data_node_send_query(DataNodeHandle * handle, const char *query)
memcpy(handle->outBuffer + handle->outEnd, query, strLen);
handle->outEnd += strLen;
- /* We need response right away, so send immediately */
- if (send_some(handle, handle->outEnd) < 0)
- return EOF;
-
handle->state = DN_CONNECTION_STATE_BUSY;
- return 0;
+ return data_node_flush(handle);
}
@@ -1838,3 +1815,439 @@ DataNodeCleanAndRelease(int code, Datum arg)
/* Dump collected statistics to the log */
stat_log();
}
+
+/*
+ * Begin COPY command
+ * The copy_connections array must have room for NumDataNodes items
+ */
+DataNodeHandle**
+DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot)
+{
+ int i, j;
+ int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist);
+ struct timeval *timeout = NULL;
+ DataNodeHandle **connections;
+ DataNodeHandle **copy_connections;
+ DataNodeHandle *newConnections[conn_count];
+ int new_count = 0;
+ ListCell *nodeitem;
+ bool need_tran;
+ GlobalTransactionId gxid;
+ ResponseCombiner combiner;
+
+ if (conn_count == 0)
+ return NULL;
+
+ /* Get needed datanode connections */
+ connections = get_handles(nodelist);
+ if (!connections)
+ return NULL;
+
+ need_tran = !autocommit || conn_count > 1;
+
+ elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false");
+
+ /*
+ * We need to be able quickly find a connection handle for specified node number,
+ * So store connections in an array where index is node-1.
+ * Unused items in the array should be NULL
+ */
+ copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *));
+ i = 0;
+ foreach(nodeitem, nodelist)
+ copy_connections[lfirst_int(nodeitem) - 1] = connections[i++];
+
+ /* Gather statistics */
+ stat_statement();
+ if (autocommit)
+ stat_transaction(conn_count);
+
+ /* We normally clear for transactions, but if autocommit, clear here, too */
+ if (autocommit)
+ {
+ clear_write_node_list();
+ }
+
+ /* Check status of connections */
+ /* We want to track new "write" nodes, and new nodes in the current transaction
+ * whether or not they are write nodes. */
+ if (write_node_count < NumDataNodes)
+ {
+ for (i = 0; i < conn_count; i++)
+ {
+ bool found = false;
+ for (j=0; j<write_node_count && !found; j++)
+ {
+ if (write_node_list[j] == connections[i])
+ found = true;
+ }
+ if (!found)
+ {
+ /* Add to transaction wide-list */
+ write_node_list[write_node_count++] = connections[i];
+ /* Add to current statement list */
+ newConnections[new_count++] = connections[i];
+ }
+ }
+ // Check connection state is DN_CONNECTION_STATE_IDLE
+ }
+
+ gxid = GetCurrentGlobalTransactionId();
+
+ /* elog(DEBUG1, "Current gxid = %d", gxid); */
+
+ if (!GlobalTransactionIdIsValid(gxid))
+ {
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (new_count > 0 && need_tran)
+ {
+ /* Start transaction on connections where it is not started */
+ if (data_node_begin(new_count, newConnections, DestNone, gxid))
+ {
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ }
+
+ /* Send query to nodes */
+ for (i = 0; i < conn_count; i++)
+ {
+ /* If explicit transaction is needed gxid is already sent */
+ if (!need_tran && data_node_send_gxid(connections[i], gxid))
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (snapshot && data_node_send_snapshot(connections[i], snapshot))
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ if (data_node_send_query(connections[i], query) != 0)
+ {
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+ }
+
+ /*
+ * We are expecting CopyIn response, but do not want to send it to client,
+ * caller should take care about this, because here we do not know if
+ * client runs console or file copy
+ */
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, DestNone);
+
+ /* Receive responses */
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ {
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE, DestNone);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
+
+ pfree(connections);
+ return copy_connections;
+}
+
+/*
+ * Buffer size does not affect performance significantly, just do not allow
+ * connection buffer grows infinitely
+ */
+#define COPY_BUFFER_SIZE 8192
+#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
+
+/*
+ * Send a data row to the specified nodes
+ */
+int
+DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections)
+{
+ DataNodeHandle *primary_handle = NULL;
+ ListCell *nodeitem;
+ /* size + data row + \n */
+ int msgLen = 4 + len + 1;
+ int nLen = htonl(msgLen);
+
+ if (exec_nodes->primarynodelist)
+ {
+ primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1];
+ }
+
+ if (primary_handle)
+ {
+ if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN)
+ {
+ /* precalculate to speed up access */
+ int bytes_needed = primary_handle->outEnd + 1 + msgLen;
+
+ /* flush buffer if it is almost full */
+ if (bytes_needed > COPY_BUFFER_SIZE)
+ {
+ /* First look if data node has sent a error message */
+ int read_status = data_node_read_data(primary_handle);
+ if (read_status == EOF || read_status < 0)
+ {
+ add_error_message(primary_handle, "failed to read data from data node");
+ return EOF;
+ }
+
+ if (primary_handle->inStart < primary_handle->inEnd)
+ {
+ ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone);
+ handle_response(primary_handle, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ return EOF;
+ }
+
+ if (DN_CONNECTION_STATE_ERROR(primary_handle))
+ return EOF;
+
+ if (send_some(primary_handle, primary_handle->outEnd) < 0)
+ {
+ add_error_message(primary_handle, "failed to send data to data node");
+ return EOF;
+ }
+ }
+
+ if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ primary_handle->outBuffer[primary_handle->outEnd++] = 'd';
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
+ primary_handle->outEnd += 4;
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len);
+ primary_handle->outEnd += len;
+ primary_handle->outBuffer[primary_handle->outEnd++] = '\n';
+ }
+ else
+ {
+ add_error_message(primary_handle, "Invalid data node connection");
+ return EOF;
+ }
+ }
+
+ foreach(nodeitem, exec_nodes->nodelist)
+ {
+ DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1];
+ if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN)
+ {
+ /* precalculate to speed up access */
+ int bytes_needed = handle->outEnd + 1 + msgLen;
+
+ /* flush buffer if it is almost full */
+ if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD)
+ || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE))
+ {
+ int to_send = handle->outEnd;
+
+ /* First look if data node has sent a error message */
+ int read_status = data_node_read_data(handle);
+ if (read_status == EOF || read_status < 0)
+ {
+ add_error_message(handle, "failed to read data from data node");
+ return EOF;
+ }
+
+ if (handle->inStart < handle->inEnd)
+ {
+ ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone);
+ handle_response(handle, combiner);
+ if (!ValidateAndCloseCombiner(combiner))
+ return EOF;
+ }
+
+ if (DN_CONNECTION_STATE_ERROR(handle))
+ return EOF;
+
+ /*
+ * Allow primary node to write out data before others.
+ * If primary node was blocked it would not accept copy data.
+ * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes.
+ * If primary node is blocked and is buffering, other buffers will
+ * grow accordingly.
+ */
+ if (primary_handle)
+ {
+ if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd)
+ to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD;
+ else
+ to_send = 0;
+ }
+
+ /*
+ * Try to send down buffered data if we have
+ */
+ if (to_send && send_some(handle, to_send) < 0)
+ {
+ add_error_message(handle, "failed to send data to data node");
+ return EOF;
+ }
+ }
+
+ if (ensure_out_buffer_capacity(bytes_needed, handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'd';
+ memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
+ handle->outEnd += 4;
+ memcpy(handle->outBuffer + handle->outEnd, data_row, len);
+ handle->outEnd += len;
+ handle->outBuffer[handle->outEnd++] = '\n';
+ }
+ else
+ {
+ add_error_message(handle, "Invalid data node connection");
+ return EOF;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Finish copy process on all connections
+ */
+uint64
+DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
+ CombineType combine_type, CommandDest dest)
+{
+ int i;
+ int nLen = htonl(4);
+ ResponseCombiner combiner = NULL;
+ bool need_tran;
+ bool res = 0;
+ struct timeval *timeout = NULL; /* wait forever */
+ DataNodeHandle *connections[NumDataNodes];
+ DataNodeHandle *primary_handle = NULL;
+ int conn_count = 0;
+
+ for (i = 0; i < NumDataNodes; i++)
+ {
+ DataNodeHandle *handle = copy_connections[i];
+
+ if (!handle)
+ continue;
+
+ if (i == primary_data_node - 1)
+ primary_handle = handle;
+ else
+ connections[conn_count++] = handle;
+ }
+
+ if (primary_handle)
+ {
+ if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ primary_handle->outBuffer[primary_handle->outEnd++] = 'c';
+ memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4);
+ primary_handle->outEnd += 4;
+
+ /* We need response right away, so send immediately */
+ if (data_node_flush(primary_handle) < 0)
+ {
+ res = EOF;
+ }
+ }
+ else
+ {
+ res = EOF;
+ }
+
+ combiner = CreateResponseCombiner(conn_count + 1, combine_type, dest);
+ data_node_receive_responses(1, &primary_handle, timeout, combiner);
+ }
+
+ for (i = 0; i < conn_count; i++)
+ {
+ DataNodeHandle *handle = connections[i];
+
+ if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
+ handle->outBuffer[handle->outEnd++] = 'c';
+ memcpy(handle->outBuffer + handle->outEnd, &nLen, 4);
+ handle->outEnd += 4;
+
+ /* We need response right away, so send immediately */
+ if (data_node_flush(handle) < 0)
+ {
+ res = EOF;
+ }
+ }
+ else
+ {
+ res = EOF;
+ }
+ }
+
+ need_tran = !autocommit || primary_handle || conn_count > 1;
+
+ if (!combiner)
+ combiner = CreateResponseCombiner(conn_count, combine_type, dest);
+ data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (!ValidateAndCloseCombiner(combiner) || res)
+ {
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeRollback(DestNone);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ return 0;
+ }
+
+ if (autocommit)
+ {
+ if (need_tran)
+ DataNodeCommit(DestNone);
+ else
+ if (!PersistentConnections) release_handles();
+ }
+
+ // Verify status?
+ return combiner->row_count;
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 427bc6b8c3..7bdd5b8d38 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -79,9 +79,10 @@
/* PGXC_COORD */
#include "pgxc/planner.h"
#include "pgxc/datanode.h"
+#include "commands/copy.h"
/* PGXC_DATANODE */
-#include "access/transam.h"
-#endif
+#include "access/transam.h"
+#endif
extern int optind;
extern char *optarg;
@@ -204,10 +205,10 @@ static List * pgxc_execute_direct (Node *parsetree, List *querytree_list, Comman
* ----------------------------------------------------------------
*/
-/*
- * Called when the backend is ending.
+/*
+ * Called when the backend is ending.
*/
-static void
+static void
DataNodeShutdown (int code, Datum arg)
{
/* Close connection with GTM, if active */
@@ -433,7 +434,7 @@ SocketBackend(StringInfo inBuf)
case 'g':
case 's':
break;
-#endif
+#endif
default:
@@ -898,14 +899,14 @@ exec_simple_query(const char *query_string)
Portal portal;
DestReceiver *receiver;
int16 format;
-#ifdef PGXC
+#ifdef PGXC
Query_Plan *query_plan;
Query_Step *query_step;
bool exec_on_coord;
int data_node_error = 0;
- /*
+ /*
* By default we do not want data nodes to contact GTM directly,
* it should get this information passed down to it.
*/
@@ -978,10 +979,22 @@ exec_simple_query(const char *query_string)
else if (IsA(parsetree, ExecDirectStmt))
querytree_list = pgxc_execute_direct(parsetree, querytree_list, dest, snapshot_set, &exec_on_coord);
- else
+ else if (IsA(parsetree, CopyStmt))
+ {
+ CopyStmt *copy = (CopyStmt *) parsetree;
+ /* Snapshot is needed for the Copy */
+ if (!snapshot_set)
+ {
+ PushActiveSnapshot(GetTransactionSnapshot());
+ snapshot_set = true;
+ }
+ DoCopy(copy, query_string);
+ exec_on_coord = false;
+ }
+ else
{
query_plan = GetQueryPlan(parsetree, query_string, querytree_list);
-
+
exec_on_coord = query_plan->exec_loc_type & EXEC_ON_COORD;
}
@@ -1003,7 +1016,7 @@ exec_simple_query(const char *query_string)
/* If we got a cancel signal in analysis or planning, quit */
CHECK_FOR_INTERRUPTS();
-#ifdef PGXC
+#ifdef PGXC
/* PGXC_DATANODE */
/* Force getting Xid from GTM if not autovacuum, but a vacuum */
if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment)
@@ -1089,7 +1102,7 @@ exec_simple_query(const char *query_string)
PortalDrop(portal, false);
-#ifdef PGXC
+#ifdef PGXC
}
/* PGXC_COORD */
@@ -1100,9 +1113,10 @@ exec_simple_query(const char *query_string)
{
query_step = linitial(query_plan->query_step_list);
- data_node_error = DataNodeExec(query_step->sql_statement,
- query_step->exec_nodes,
- dest,
+ data_node_error = DataNodeExec(query_step->sql_statement,
+ query_step->exec_nodes,
+ query_step->combine_type,
+ dest,
snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(),
query_plan->force_autocommit,
query_step->simple_aggregates,
@@ -1110,15 +1124,17 @@ exec_simple_query(const char *query_string)
if (data_node_error)
{
- /* Error. If it ran on the coordinator (DDL), too, make sure we abort */
- if (exec_on_coord)
- {
- AbortCurrentTransaction();
- xact_started = false;
- }
+ /* An error occurred, change status on Coordinator, too,
+ * even if no statements ran on it.
+ * We want to only allow COMMIT/ROLLBACK
+ */
+ AbortCurrentTransaction();
+ xact_started = false;
+ /* AT() clears active snapshot */
+ snapshot_set = false;
}
}
-
+
FreeQueryPlan(query_plan);
}
@@ -3111,7 +3127,7 @@ PostgresMain(int argc, char *argv[], const char *username)
case 'C':
isPGXCCoordinator = true;
break;
-#endif
+#endif
case 'D':
if (secure)
@@ -3239,7 +3255,7 @@ PostgresMain(int argc, char *argv[], const char *username)
case 'X':
isPGXCDataNode = true;
break;
-#endif
+#endif
case 'y':
/*
@@ -3749,9 +3765,9 @@ PostgresMain(int argc, char *argv[], const char *username)
ReadyForQuery(whereToSendOutput);
#ifdef PGXC
- /*
- * Helps us catch any problems where we did not send down a snapshot
- * when it was expected.
+ /*
+ * Helps us catch any problems where we did not send down a snapshot
+ * when it was expected.
*/
if (IS_PGXC_DATANODE)
UnsetGlobalSnapshotData();
@@ -4276,9 +4292,9 @@ pgxc_transaction_stmt (Node *parsetree)
switch (stmt->kind)
{
case TRANS_STMT_BEGIN:
- /*
+ /*
* This does not yet send down a BEGIN,
- * we do that "on demand" as data nodes are added
+ * we do that "on demand" as data nodes are added
*/
DataNodeBegin();
break;
@@ -4315,17 +4331,18 @@ pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bo
Assert(IS_PGXC_COORDINATOR);
Assert(IsA(parsetree, ExecDirectStmt));
-
+
exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
foreach (node_cell, execdirect->nodes)
{
int node_int = intVal(lfirst(node_cell));
- exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int);
+ exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int);
}
if (exec_nodes->nodelist)
if (DataNodeExec(execdirect->query,
exec_nodes,
+ COMBINE_TYPE_SAME,
dest,
snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(),
FALSE,
@@ -4337,24 +4354,24 @@ pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bo
{
/*
* Parse inner statement, like at the begiining of the function
- * We do not have to release wrapper trees, the message context
+ * We do not have to release wrapper trees, the message context
* will be deleted later
- * Also, no need to switch context - current is already
+ * Also, no need to switch context - current is already
* the MessageContext
*/
parsetree_list = pg_parse_query(execdirect->query);
/* We do not want to log or display the inner command */
- /*
- * we do not support complex commands (expanded to multiple
+ /*
+ * we do not support complex commands (expanded to multiple
* parse trees) within EXEC DIRECT
*/
if (list_length(parsetree_list) != 1)
{
- ereport(ERROR,
+ ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("Can not execute %s with EXECUTE DIRECT",
+ errmsg("Can not execute %s with EXECUTE DIRECT",
execdirect->query)));
}
diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h
index 2596e50677..7926cbd9ff 100644
--- a/src/include/pgxc/combiner.h
+++ b/src/include/pgxc/combiner.h
@@ -46,6 +46,9 @@ typedef struct
int row_count;
RequestType request_type;
int description_count;
+ uint64 copy_in_count;
+ uint64 copy_out_count;
+ bool inErrorState;
List *simple_aggregates;
} ResponseCombinerData;
@@ -59,5 +62,6 @@ extern int CombineResponse(ResponseCombiner combiner, char msg_type,
extern bool ValidateAndCloseCombiner(ResponseCombiner combiner);
extern bool ValidateAndResetCombiner(ResponseCombiner combiner);
extern void AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates);
+extern void CloseCombiner(ResponseCombiner combiner);
#endif /* COMBINER_H */
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
index 165be530d9..4f75ba24af 100644
--- a/src/include/pgxc/datanode.h
+++ b/src/include/pgxc/datanode.h
@@ -33,10 +33,16 @@ typedef enum
DN_CONNECTION_STATE_COMPLETED,
DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */
DN_CONNECTION_STATE_ERROR_READY, /* error and received ReadyForQuery */
- DN_CONNECTION_STATE_ERROR_FATAL /* fatal error */
-
+ DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */
+ DN_CONNECTION_STATE_COPY_IN,
+ DN_CONNECTION_STATE_COPY_OUT
} DNConnectionState;
+#define DN_CONNECTION_STATE_ERROR(dnconn) \
+ (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \
+ || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY \
+ || (dnconn)->state == DN_CONNECTION_STATE_ERROR_READY
+
struct data_node_handle
{
/* fd of the connection */
@@ -74,7 +80,11 @@ extern void DataNodeBegin(void);
extern int DataNodeCommit(CommandDest dest);
extern int DataNodeRollback(CommandDest dest);
-extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only);
+extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only);
+
+extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot);
+extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
+extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest);
extern int primary_data_node;
#endif
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 6fa32fb73b..1d5280c765 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -52,6 +52,7 @@ typedef struct
{
List *primarynodelist;
List *nodelist;
+ char baselocatortype;
} Exec_Nodes;
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index c738312ce1..04ec6fcc0e 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -16,6 +16,7 @@
#define PGXCPLANNER_H
#include "pgxc/locator.h"
+#include "pgxc/combiner.h"
/* for Query_Plan.exec_loc_type can have these OR'ed*/
@@ -30,6 +31,7 @@ typedef struct
{
char *sql_statement;
Exec_Nodes *exec_nodes;
+ CombineType combine_type;
List *simple_aggregates; /* simple aggregate to combine on this
* step */
} Query_Step;