summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c400
-rw-r--r--src/backend/pgxc/Makefile2
-rw-r--r--src/backend/pgxc/copy/Makefile19
-rw-r--r--src/backend/pgxc/copy/remotecopy.c384
-rw-r--r--src/backend/pgxc/locator/locator.c6
-rw-r--r--src/include/pgxc/locator.h6
-rw-r--r--src/include/pgxc/remotecopy.h75
7 files changed, 560 insertions, 332 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 26d0f553f3..074bf09b39 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -39,6 +39,7 @@
#include "pgxc/pgxc.h"
#include "pgxc/execRemote.h"
#include "pgxc/locator.h"
+#include "pgxc/remotecopy.h"
#include "nodes/nodes.h"
#include "pgxc/poolmgr.h"
#include "pgxc/postgresql_fdw.h"
@@ -199,24 +200,8 @@ typedef struct CopyStateData
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
#ifdef PGXC
- /*
- * On Coordinator we need to rewrite query.
- * While client may submit a copy command dealing with file, Datanodes
- * always send/receive data to/from the Coordinator. So we can not use
- * original statement and should rewrite statement, specifing STDIN/STDOUT
- * as copy source or destination
- */
- StringInfoData query_buf;
-
- /* Execution nodes for COPY */
- ExecNodes *exec_nodes;
-
- /* Locator information */
- RelationLocInfo *rel_loc; /* the locator key */
- int idx_dist_by_col; /* index of the distributed by column */
-
- PGXCNodeHandle **connections; /* Involved Datanode connections */
- TupleDesc tupDesc; /* for INSERT SELECT */
+ /* Remote COPY state data */
+ RemoteCopyData *remoteCopyState;
#endif
} CopyStateData;
@@ -341,8 +326,7 @@ static bool CopyGetInt16(CopyState cstate, int16 *val);
#ifdef PGXC
-static ExecNodes *build_copy_statement(CopyState cstate, List *attnamelist,
- TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull);
+static RemoteCopyOptions *GetRemoteCopyOptions(CopyState cstate);
static void append_defvals(Datum *values, CopyState cstate);
#endif
@@ -765,102 +749,6 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
-#ifdef PGXC
-/*
- * CopyQuoteStr appends quoted value to the query buffer. Value is escaped
- * if needed
- *
- * When rewriting query to be sent down to nodes we should escape special
- * characters, that may present in the value. The characters are backslash(\)
- * and single quote ('). These characters are escaped by doubling. We do not
- * have to escape characters like \t, \v, \b, etc. because Datanode interprets
- * them properly.
- * We use E'...' syntax for literals containing backslashes.
- */
-static void
-CopyQuoteStr(StringInfo query_buf, char *value)
-{
- char *start = value;
- char *current = value;
- char c;
- bool has_backslash = (strchr(value, '\\') != NULL);
-
- if (has_backslash)
- appendStringInfoChar(query_buf, 'E');
-
- appendStringInfoChar(query_buf, '\'');
-
-#define APPENDSOFAR \
- if (current > start) \
- appendBinaryStringInfo(query_buf, start, current - start)
-
- while ((c = *current) != '\0')
- {
- switch (c)
- {
- case '\\':
- case '\'':
- APPENDSOFAR;
- /* Double current */
- appendStringInfoChar(query_buf, c);
- /* Second current will be appended next time */
- start = current;
- /* fallthru */
- default:
- current++;
- }
- }
- APPENDSOFAR;
- appendStringInfoChar(query_buf, '\'');
-}
-
-/*
- * CopyQuoteIdentifier determine if identifier needs to be quoted and surround
- * it with double quotes
- */
-static void
-CopyQuoteIdentifier(StringInfo query_buf, char *value)
-{
- char *start = value;
- char *current = value;
- char c;
- int len = strlen(value);
- bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len)
- || (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL);
-
- if (need_quote)
- {
- appendStringInfoChar(query_buf, '"');
-
-#define APPENDSOFAR \
- if (current > start) \
- appendBinaryStringInfo(query_buf, start, current - start)
-
- while ((c = *current) != '\0')
- {
- switch (c)
- {
- case '"':
- APPENDSOFAR;
- /* Double current */
- appendStringInfoChar(query_buf, c);
- /* Second current will be appended next time */
- start = current;
- /* fallthru */
- default:
- current++;
- }
- }
- APPENDSOFAR;
- appendStringInfoChar(query_buf, '"');
- }
- else
- {
- appendBinaryStringInfo(query_buf, value, len);
- }
-}
-#endif
-
/*
* DoCopy executes the SQL COPY statement
@@ -1288,9 +1176,6 @@ BeginCopy(bool is_from,
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
-#ifdef PGXC
- ExecNodes *exec_nodes;
-#endif
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
@@ -1329,12 +1214,25 @@ BeginCopy(bool is_from,
/* Get copy statement and execution node information */
if (IS_PGXC_COORDINATOR)
{
- exec_nodes = build_copy_statement(cstate,
- attnamelist,
- tupDesc,
- is_from,
- cstate->force_quote,
- cstate->force_notnull);
+ RemoteCopyData *remoteCopyState = (RemoteCopyData *) palloc0(sizeof(RemoteCopyData));
+ List *attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
+
+ /* Setup correct COPY FROM/TO flag */
+ remoteCopyState->is_from = is_from;
+
+ /* Get execution node list */
+ RemoteCopy_GetRelationLoc(remoteCopyState,
+ cstate->rel,
+ attnums);
+ /* Build remote query */
+ RemoteCopy_BuildStatement(remoteCopyState,
+ cstate->rel,
+ GetRemoteCopyOptions(cstate),
+ attnamelist,
+ attnums);
+
+ /* Then assign built structure */
+ cstate->remoteCopyState = remoteCopyState;
}
#endif
}
@@ -1500,16 +1398,18 @@ BeginCopy(bool is_from,
*/
if (IS_PGXC_COORDINATOR)
{
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
+
/*
* In the case of CopyOut, it is just necessary to pick up one node randomly.
* This is done when rel_loc is found.
*/
- if (cstate->rel_loc)
+ if (remoteCopyState && remoteCopyState->rel_loc)
{
- cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
- exec_nodes->nodeList,
- GetActiveSnapshot());
- if (!cstate->connections)
+ remoteCopyState->connections = DataNodeCopyBegin(remoteCopyState->query_buf.data,
+ remoteCopyState->exec_nodes->nodeList,
+ GetActiveSnapshot());
+ if (!remoteCopyState->connections)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Failed to initialize Datanodes for COPY")));
@@ -1795,15 +1695,19 @@ CopyTo(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR && cstate->rel_loc)
+ if (IS_PGXC_COORDINATOR &&
+ cstate->remoteCopyState &&
+ cstate->remoteCopyState->rel_loc)
{
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
+
/*
* We don't know the value of the distribution column value, so need to
* read from all nodes. Hence indicate that the value is NULL.
*/
processed = DataNodeCopyOut(
- GetRelationNodes(cstate->rel_loc, 0, true, UNKNOWNOID, RELATION_ACCESS_READ),
- cstate->connections,
+ GetRelationNodes(remoteCopyState->rel_loc, 0, true, UNKNOWNOID, RELATION_ACCESS_READ),
+ remoteCopyState->connections,
cstate->copy_file);
}
else
@@ -2228,19 +2132,19 @@ CopyFrom(CopyState cstate)
* Send the data row as-is to the Datanodes. If default values
* are to be inserted, append them onto the data row.
*/
-
- if (IS_PGXC_COORDINATOR && cstate->rel_loc)
+ if (IS_PGXC_COORDINATOR && cstate->remoteCopyState->rel_loc)
{
Form_pg_attribute *attr = tupDesc->attrs;
Datum dist_col_value;
bool dist_col_is_null;
Oid dist_col_type;
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
- if (cstate->idx_dist_by_col >= 0)
+ if (remoteCopyState->idx_dist_by_col >= 0)
{
- dist_col_value = values[cstate->idx_dist_by_col];
- dist_col_is_null = nulls[cstate->idx_dist_by_col];
- dist_col_type = attr[cstate->idx_dist_by_col]->atttypid;
+ dist_col_value = values[remoteCopyState->idx_dist_by_col];
+ dist_col_is_null = nulls[remoteCopyState->idx_dist_by_col];
+ dist_col_type = attr[remoteCopyState->idx_dist_by_col]->atttypid;
}
else
{
@@ -2252,12 +2156,12 @@ CopyFrom(CopyState cstate)
if (DataNodeCopyIn(cstate->line_buf.data,
cstate->line_buf.len,
- GetRelationNodes(cstate->rel_loc,
+ GetRelationNodes(remoteCopyState->rel_loc,
dist_col_value,
dist_col_is_null,
dist_col_type,
RELATION_ACCESS_INSERT),
- cstate->connections))
+ remoteCopyState->connections))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a Datanode")));
@@ -2578,6 +2482,8 @@ BeginCopyFrom(Relation rel,
/* This is done at the beginning of COPY FROM from Coordinator to Datanodes */
if (IS_PGXC_COORDINATOR)
{
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
+
/* Empty buffer info and send header to all the backends involved in COPY */
resetStringInfo(&cstate->line_buf);
@@ -2594,7 +2500,7 @@ BeginCopyFrom(Relation rel,
tmp = htonl(tmp);
appendBinaryStringInfo(&cstate->line_buf, (char *) &tmp, 4);
- if (DataNodeCopyInBinaryForAll(cstate->line_buf.data, 19, cstate->connections))
+ if (DataNodeCopyInBinaryForAll(cstate->line_buf.data, 19, remoteCopyState->connections))
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (COPY SEND)")));
@@ -3036,17 +2942,17 @@ void
EndCopyFrom(CopyState cstate)
{
#ifdef PGXC
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
+
/* For PGXC related COPY, free also relation location data */
- if (IS_PGXC_COORDINATOR && cstate->rel_loc)
+ if (IS_PGXC_COORDINATOR && remoteCopyState->rel_loc)
{
- bool replicated = cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED;
+ bool replicated = remoteCopyState->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED;
DataNodeCopyFinish(
- cstate->connections,
+ remoteCopyState->connections,
replicated ? PGXCNodeGetNodeId(primary_data_node, PGXC_NODE_DATANODE) : -1,
replicated ? COMBINE_TYPE_SAME : COMBINE_TYPE_SUM);
- pfree(cstate->connections);
- pfree(cstate->query_buf.data);
- FreeRelationLocInfo(cstate->rel_loc);
+ FreeRemoteCopyData(remoteCopyState);
}
#endif
/* No COPY FROM related resources except memory. */
@@ -4380,186 +4286,30 @@ CreateCopyDestReceiver(void)
}
#ifdef PGXC
-/*
- * Rebuild a COPY statement in cstate and set ExecNodes
- */
-static ExecNodes*
-build_copy_statement(CopyState cstate, List *attnamelist,
- TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull)
+static RemoteCopyOptions *
+GetRemoteCopyOptions(CopyState cstate)
{
- char *pPartByCol;
- ExecNodes *exec_nodes = makeNode(ExecNodes);
- int attnum;
- List *attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
-
-
- /*
- * If target table does not exists on nodes (e.g. system table)
- * the location info returned is NULL. This is the criteria, when
- * we need to run COPY on Coordinator
- */
- cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
-
- pPartByCol = GetRelationDistColumn(cstate->rel_loc);
- if (cstate->rel_loc)
- {
- /*
- * Pick up one node only
- * This case corresponds to a replicated table with COPY TO
- *
- */
- if (!is_from && cstate->rel_loc->locatorType == 'R')
- exec_nodes->nodeList = GetAnyDataNode(cstate->rel_loc->nodeList);
- else
- {
- /* All nodes necessary */
- exec_nodes->nodeList = list_concat(exec_nodes->nodeList, cstate->rel_loc->nodeList);
- }
- }
-
- cstate->idx_dist_by_col = -1;
- if (pPartByCol)
- {
- ListCell *cur;
- foreach(cur, attnums)
- {
- attnum = lfirst_int(cur);
- if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), pPartByCol) == 0)
- {
- cstate->idx_dist_by_col = attnum - 1;
- break;
- }
- }
- }
-
- /*
- * Build up query string for the Datanodes, it should match
- * to original string, but should have STDIN/STDOUT instead
- * of filename.
- */
- initStringInfo(&cstate->query_buf);
-
- appendStringInfoString(&cstate->query_buf, "COPY ");
- appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel));
-
- if (attnamelist)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " (");
- foreach (cell, attnamelist)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
-
- /*
- * For COPY FROM, we need to append unspecified attributes that have
- * default expressions associated.
- */
- if (is_from)
- {
- for (attnum = 1; attnum <= tupDesc->natts; attnum++)
- {
- /* Don't let dropped attributes go into the column list */
- if (tupDesc->attrs[attnum - 1]->attisdropped)
- continue;
-
- if (!list_member_int(attnums, attnum))
- {
- /* Append only if the default expression is not shippable. */
- Expr *defexpr = (Expr*) build_column_default(cstate->rel, attnum);
- if (defexpr &&
- !pgxc_is_expr_shippable(expression_planner(defexpr), NULL))
- {
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf,
- NameStr(tupDesc->attrs[attnum - 1]->attname));
- }
- }
- }
- }
-
- appendStringInfoChar(&cstate->query_buf, ')');
- }
-
- if (is_from)
- appendStringInfoString(&cstate->query_buf, " FROM STDIN");
- else
- appendStringInfoString(&cstate->query_buf, " TO STDOUT");
-
-
- if (cstate->binary)
- appendStringInfoString(&cstate->query_buf, " BINARY");
-
- if (cstate->oids)
- appendStringInfoString(&cstate->query_buf, " OIDS");
-
+ RemoteCopyOptions *res;
+ Assert(cstate);
+ res = (RemoteCopyOptions *) palloc0(sizeof(RemoteCopyOptions));
+
+ /* Then fill in structure */
+ res->rco_binary = cstate->binary;
+ res->rco_oids = cstate->oids;
+ res->rco_csv_mode = cstate->csv_mode;
if (cstate->delim)
- if ((!cstate->csv_mode && cstate->delim[0] != '\t')
- || (cstate->csv_mode && cstate->delim[0] != ','))
- {
- appendStringInfoString(&cstate->query_buf, " DELIMITER AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->delim);
- }
-
+ res->rco_delim = pstrdup(cstate->delim);
if (cstate->null_print)
- if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N"))
- || (cstate->csv_mode && strcmp(cstate->null_print, "")))
- {
- appendStringInfoString(&cstate->query_buf, " NULL AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->null_print);
- }
-
- if (cstate->csv_mode)
- appendStringInfoString(&cstate->query_buf, " CSV");
-
- /*
- * It is not necessary to send the HEADER part to Datanodes.
- * Sending data is sufficient.
- */
-
- if (cstate->quote && cstate->quote[0] != '"')
- {
- appendStringInfoString(&cstate->query_buf, " QUOTE AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->quote);
- }
-
- if (cstate->escape && cstate->quote && cstate->escape[0] != cstate->quote[0])
- {
- appendStringInfoString(&cstate->query_buf, " ESCAPE AS ");
- CopyQuoteStr(&cstate->query_buf, cstate->escape);
- }
-
- if (force_quote)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " FORCE QUOTE ");
- foreach (cell, force_quote)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
- }
+ res->rco_null_print = pstrdup(cstate->null_print);
+ if (cstate->quote)
+ res->rco_quote = pstrdup(cstate->quote);
+ if (cstate->escape)
+ res->rco_escape = pstrdup(cstate->escape);
+ if (cstate->force_quote)
+ res->rco_force_quote = list_copy(cstate->force_quote);
+ if (cstate->force_notnull)
+ res->rco_force_notnull = list_copy(cstate->force_notnull);
- if (force_notnull)
- {
- ListCell *cell;
- ListCell *prev = NULL;
- appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL ");
- foreach (cell, force_notnull)
- {
- if (prev)
- appendStringInfoString(&cstate->query_buf, ", ");
- CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell)));
- prev = cell;
- }
- }
- return exec_nodes;
+ return res;
}
#endif
diff --git a/src/backend/pgxc/Makefile b/src/backend/pgxc/Makefile
index 7149b1680f..49a299b7c2 100644
--- a/src/backend/pgxc/Makefile
+++ b/src/backend/pgxc/Makefile
@@ -11,6 +11,6 @@ subdir = src/backend/pgxc
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-SUBDIRS = locator plan pool barrier nodemgr xc_maintenance_mode
+SUBDIRS = locator plan pool barrier nodemgr copy xc_maintenance_mode
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/copy/Makefile b/src/backend/pgxc/copy/Makefile
new file mode 100644
index 0000000000..a8cfbd86da
--- /dev/null
+++ b/src/backend/pgxc/copy/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for remote copy management
+#
+# Copyright (c) 2010-2012 Postgres-XC Development Group
+#
+# IDENTIFICATION
+# $PostgreSQL$
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/pgxc/copy
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = remotecopy.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/copy/remotecopy.c b/src/backend/pgxc/copy/remotecopy.c
new file mode 100644
index 0000000000..8c3eba0bff
--- /dev/null
+++ b/src/backend/pgxc/copy/remotecopy.c
@@ -0,0 +1,384 @@
+/*-------------------------------------------------------------------------
+ *
+ * remotecopy.c
+ * Implements an extension of COPY command for remote management
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2010-2012, Postgres-XC Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/pgxc/copy/remotecopy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "miscadmin.h"
+#include "lib/stringinfo.h"
+#include "optimizer/planner.h"
+#include "pgxc/pgxcnode.h"
+#include "pgxc/postgresql_fdw.h"
+#include "pgxc/remotecopy.h"
+#include "rewrite/rewriteHandler.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+
+static void RemoteCopy_QuoteStr(StringInfo query_buf, char *value);
+static void RemoteCopy_QuoteIdentifier(StringInfo query_buf, char *value);
+
+/*
+ * RemoteCopy_GetRelationLoc
+ * Get relation node list based on COPY data involved
+ */
+void
+RemoteCopy_GetRelationLoc(RemoteCopyData *state,
+ Relation rel,
+ List *attnums)
+{
+ ExecNodes *exec_nodes = makeNode(ExecNodes);
+
+ /*
+ * If target table does not exists on nodes (e.g. system table)
+ * the location info returned is NULL. This is the criteria, when
+ * we need to run COPY on Coordinator
+ */
+ state->rel_loc = GetRelationLocInfo(RelationGetRelid(rel));
+
+ if (state->rel_loc)
+ {
+ /*
+ * Pick up one node only
+ * This case corresponds to a replicated table with COPY TO
+ *
+ */
+ if (!state->is_from &&
+ IsLocatorReplicated(state->rel_loc->locatorType))
+ exec_nodes->nodeList = GetPreferredReplicationNode(state->rel_loc->nodeList);
+ else
+ {
+ /* All nodes necessary */
+ exec_nodes->nodeList = list_concat(exec_nodes->nodeList, state->rel_loc->nodeList);
+ }
+ }
+
+ state->idx_dist_by_col = -1;
+ if (state->rel_loc->partAttrNum != 0)
+ {
+ /*
+ * Find the column used as key for data distribution.
+ * First scan attributes of tuple descriptor with the list
+ * of attributes used in COPY if any list is specified.
+ * If no list is specified, set this value to the one of
+ * locator information.
+ */
+ if (attnums != NIL)
+ {
+ ListCell *cur;
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+
+ if (state->rel_loc->partAttrNum == attnum)
+ {
+ state->idx_dist_by_col = attnum - 1;
+ break;
+ }
+ }
+ }
+ else
+ {
+ state->idx_dist_by_col = state->rel_loc->partAttrNum - 1;
+ }
+ }
+
+ /* Then save obtained result */
+ state->exec_nodes = exec_nodes;
+}
+
+/*
+ * RemoteCopy_BuildStatement
+ * Build a COPY query for remote management
+ */
+void
+RemoteCopy_BuildStatement(RemoteCopyData *state,
+ Relation rel,
+ RemoteCopyOptions *options,
+ List *attnamelist,
+ List *attnums)
+{
+ int attnum;
+ TupleDesc tupDesc = RelationGetDescr(rel);
+
+ /*
+ * Build up query string for the Datanodes, it should match
+ * to original string, but should have STDIN/STDOUT instead
+ * of filename.
+ */
+ initStringInfo(&state->query_buf);
+ appendStringInfoString(&state->query_buf, "COPY ");
+ appendStringInfo(&state->query_buf, "%s", RelationGetRelationName(rel));
+
+ if (attnamelist)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&state->query_buf, " (");
+ foreach (cell, attnamelist)
+ {
+ if (prev)
+ appendStringInfoString(&state->query_buf, ", ");
+ RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+
+ /*
+ * For COPY FROM, we need to append unspecified attributes that have
+ * default expressions associated.
+ */
+ if (state->is_from)
+ {
+ for (attnum = 1; attnum <= tupDesc->natts; attnum++)
+ {
+ /* Don't let dropped attributes go into the column list */
+ if (tupDesc->attrs[attnum - 1]->attisdropped)
+ continue;
+
+ if (!list_member_int(attnums, attnum))
+ {
+ /* Append only if the default expression is not shippable. */
+ Expr *defexpr = (Expr*) build_column_default(rel, attnum);
+ if (defexpr &&
+ !pgxc_is_expr_shippable(expression_planner(defexpr), NULL))
+ {
+ appendStringInfoString(&state->query_buf, ", ");
+ RemoteCopy_QuoteIdentifier(&state->query_buf,
+ NameStr(tupDesc->attrs[attnum - 1]->attname));
+ }
+ }
+ }
+ }
+
+ appendStringInfoChar(&state->query_buf, ')');
+ }
+
+ if (state->is_from)
+ appendStringInfoString(&state->query_buf, " FROM STDIN");
+ else
+ appendStringInfoString(&state->query_buf, " TO STDOUT");
+
+
+ if (options->rco_binary)
+ appendStringInfoString(&state->query_buf, " BINARY");
+
+ if (options->rco_oids)
+ appendStringInfoString(&state->query_buf, " OIDS");
+
+ if (options->rco_delim)
+ {
+ if ((!options->rco_csv_mode && options->rco_delim[0] != '\t')
+ || (options->rco_csv_mode && options->rco_delim[0] != ','))
+ {
+ appendStringInfoString(&state->query_buf, " DELIMITER AS ");
+ RemoteCopy_QuoteStr(&state->query_buf, options->rco_delim);
+ }
+ }
+
+ if (options->rco_null_print)
+ {
+ if ((!options->rco_csv_mode && strcmp(options->rco_null_print, "\\N"))
+ || (options->rco_csv_mode && strcmp(options->rco_null_print, "")))
+ {
+ appendStringInfoString(&state->query_buf, " NULL AS ");
+ RemoteCopy_QuoteStr(&state->query_buf, options->rco_null_print);
+ }
+ }
+
+ if (options->rco_csv_mode)
+ appendStringInfoString(&state->query_buf, " CSV");
+
+ /*
+ * It is not necessary to send the HEADER part to Datanodes.
+ * Sending data is sufficient.
+ */
+
+ if (options->rco_quote && options->rco_quote[0] != '"')
+ {
+ appendStringInfoString(&state->query_buf, " QUOTE AS ");
+ RemoteCopy_QuoteStr(&state->query_buf, options->rco_quote);
+ }
+
+ if (options->rco_escape && options->rco_quote && options->rco_escape[0] != options->rco_quote[0])
+ {
+ appendStringInfoString(&state->query_buf, " ESCAPE AS ");
+ RemoteCopy_QuoteStr(&state->query_buf, options->rco_escape);
+ }
+
+ if (options->rco_force_quote)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&state->query_buf, " FORCE QUOTE ");
+ foreach (cell, options->rco_force_quote)
+ {
+ if (prev)
+ appendStringInfoString(&state->query_buf, ", ");
+ RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+
+ if (options->rco_force_notnull)
+ {
+ ListCell *cell;
+ ListCell *prev = NULL;
+ appendStringInfoString(&state->query_buf, " FORCE NOT NULL ");
+ foreach (cell, options->rco_force_notnull)
+ {
+ if (prev)
+ appendStringInfoString(&state->query_buf, ", ");
+ RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell)));
+ prev = cell;
+ }
+ }
+}
+
+
+/*
+ * FreeRemoteCopyOptions
+ * Free remote COPY options structure
+ */
+void
+FreeRemoteCopyOptions(RemoteCopyOptions *options)
+{
+ /* Leave if nothing */
+ if (options == NULL)
+ return;
+
+ /* Free field by field */
+ if (options->rco_delim)
+ pfree(options->rco_delim);
+ if (options->rco_null_print)
+ pfree(options->rco_null_print);
+ if (options->rco_quote)
+ pfree(options->rco_quote);
+ if (options->rco_escape)
+ pfree(options->rco_escape);
+ if (options->rco_force_quote)
+ list_free(options->rco_force_quote);
+ if (options->rco_force_notnull)
+ list_free(options->rco_force_notnull);
+
+ /* Then finish the work */
+ pfree(options);
+}
+
+
+/*
+ * FreeRemoteCopyData
+ * Free remote COPY state data structure
+ */
+void
+FreeRemoteCopyData(RemoteCopyData *state)
+{
+ /* Leave if nothing */
+ if (state == NULL)
+ return;
+
+ if (state->connections)
+ pfree(state->connections);
+ if (state->query_buf.data)
+ pfree(state->query_buf.data);
+ FreeRelationLocInfo(state->rel_loc);
+ pfree(state);
+}
+
+
+#define APPENDSOFAR(query_buf, start, current) \
+ if (current > start) \
+ appendBinaryStringInfo(query_buf, start, current - start)
+
+/*
+ * RemoteCopy_QuoteStr
+ * Append quoted value to the query buffer. Value is escaped if needed
+ * When rewriting query to be sent down to nodes we should escape special
+ * characters, that may present in the value. The characters are backslash(\)
+ * and single quote ('). These characters are escaped by doubling. We do not
+ * have to escape characters like \t, \v, \b, etc. because Datanode interprets
+ * them properly.
+ * We use E'...' syntax for literals containing backslashes.
+ */
+static void
+RemoteCopy_QuoteStr(StringInfo query_buf, char *value)
+{
+ char *start = value;
+ char *current = value;
+ char c;
+ bool has_backslash = (strchr(value, '\\') != NULL);
+
+ if (has_backslash)
+ appendStringInfoChar(query_buf, 'E');
+
+ appendStringInfoChar(query_buf, '\'');
+
+ while ((c = *current) != '\0')
+ {
+ switch (c)
+ {
+ case '\\':
+ case '\'':
+ APPENDSOFAR(query_buf, start, current);
+ /* Double current */
+ appendStringInfoChar(query_buf, c);
+ /* Second current will be appended next time */
+ start = current;
+ /* fallthru */
+ default:
+ current++;
+ }
+ }
+ APPENDSOFAR(query_buf, start, current);
+ appendStringInfoChar(query_buf, '\'');
+}
+
+/*
+ * RemoteCopy_QuoteIdentifier
+ * Determine if identifier needs to be quoted and surround it with double quotes
+ */
+static void
+RemoteCopy_QuoteIdentifier(StringInfo query_buf, char *value)
+{
+ char *start = value;
+ char *current = value;
+ char c;
+ int len = strlen(value);
+ bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len) ||
+ (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL);
+
+ if (need_quote)
+ {
+ appendStringInfoChar(query_buf, '"');
+
+ while ((c = *current) != '\0')
+ {
+ switch (c)
+ {
+ case '"':
+ APPENDSOFAR(query_buf, start, current);
+ /* Double current */
+ appendStringInfoChar(query_buf, c);
+ /* Second current will be appended next time */
+ start = current;
+ /* fallthru */
+ default:
+ current++;
+ }
+ }
+ APPENDSOFAR(query_buf, start, current);
+ appendStringInfoChar(query_buf, '"');
+ }
+ else
+ {
+ appendBinaryStringInfo(query_buf, value, len);
+ }
+}
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index 905369e688..feab0a1f9e 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -121,11 +121,11 @@ static const unsigned int xc_mod_r[][6] =
};
/*
- * GetAnyDataNode
- * Pick any Datanode from given list, but try a preferred node
+ * GetPreferredReplicationNode
+ * Pick any Datanode from given list, however fetch a preferred node first.
*/
List *
-GetAnyDataNode(List *relNodes)
+GetPreferredReplicationNode(List *relNodes)
{
/*
* Try to find the first node in given list relNodes
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 555be50f5f..bd719911ea 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -111,15 +111,15 @@ extern int GetRoundRobinNode(Oid relid);
extern bool IsTypeHashDistributable(Oid col_type);
extern List *GetAllDataNodes(void);
extern List *GetAllCoordNodes(void);
-extern List *GetAnyDataNode(List *relNodes);
+extern List *GetPreferredReplicationNode(List *relNodes);
extern void RelationBuildLocator(Relation rel);
extern void FreeRelationLocInfo(RelationLocInfo *relationLocInfo);
extern bool IsTypeModuloDistributable(Oid col_type);
-extern char *GetRelationModuloColumn(RelationLocInfo * rel_loc_info);
+extern char *GetRelationModuloColumn(RelationLocInfo *rel_loc_info);
extern bool IsModuloColumn(RelationLocInfo *rel_loc_info, char *part_col_name);
extern bool IsModuloColumnForRelId(Oid relid, char *part_col_name);
-extern char *GetRelationDistColumn(RelationLocInfo * rel_loc_info);
+extern char *GetRelationDistColumn(RelationLocInfo *rel_loc_info);
extern bool IsDistColumnForRelId(Oid relid, char *part_col_name);
extern void FreeExecNodes(ExecNodes **exec_nodes);
diff --git a/src/include/pgxc/remotecopy.h b/src/include/pgxc/remotecopy.h
new file mode 100644
index 0000000000..77134e71f9
--- /dev/null
+++ b/src/include/pgxc/remotecopy.h
@@ -0,0 +1,75 @@
+/*-------------------------------------------------------------------------
+ *
+ * remotecopy.h
+ * Routines for extension of COPY command for cluster management
+ *
+ *
+ * Copyright (c) 2010-2012 Postgres-XC Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/include/pgxc/remotecopy.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REMOTECOPY_H
+#define REMOTECOPY_H
+
+#include "nodes/parsenodes.h"
+
+/*
+ * This contains the set of data necessary for remote COPY control.
+ */
+typedef struct RemoteCopyData {
+ /* COPY FROM/TO? */
+ bool is_from;
+
+ /*
+ * On Coordinator we need to rewrite query.
+ * While client may submit a copy command dealing with file, Datanodes
+ * always send/receive data to/from the Coordinator. So we can not use
+ * original statement and should rewrite statement, specifing STDIN/STDOUT
+ * as copy source or destination
+ */
+ StringInfoData query_buf;
+
+ /* Execution nodes for COPY */
+ ExecNodes *exec_nodes;
+
+ /* Locator information */
+ RelationLocInfo *rel_loc; /* the locator key */
+ int idx_dist_by_col; /* index of the distributed by column */
+
+ PGXCNodeHandle **connections; /* Involved Datanode connections */
+} RemoteCopyData;
+
+/*
+ * List of all the options used for query deparse step
+ * As CopyStateData stays private in copy.c and in order not to
+ * make Postgres-XC code too much intrusive in PostgreSQL code,
+ * this intermediate structure is used primarily to generate remote
+ * COPY queries based on deparsed options.
+ */
+typedef struct RemoteCopyOptions {
+ bool rco_binary; /* binary format? */
+ bool rco_oids; /* include OIDs? */
+ bool rco_csv_mode; /* Comma Separated Value format? */
+ char *rco_delim; /* column delimiter (must be 1 byte) */
+ char *rco_null_print; /* NULL marker string (server encoding!) */
+ char *rco_quote; /* CSV quote char (must be 1 byte) */
+ char *rco_escape; /* CSV escape char (must be 1 byte) */
+ List *rco_force_quote; /* list of column names */
+ List *rco_force_notnull; /* list of column names */
+} RemoteCopyOptions;
+
+extern void RemoteCopy_BuildStatement(RemoteCopyData *state,
+ Relation rel,
+ RemoteCopyOptions *options,
+ List *attnamelist,
+ List *attnums);
+extern void RemoteCopy_GetRelationLoc(RemoteCopyData *state,
+ Relation rel,
+ List *attnums);
+extern void FreeRemoteCopyData(RemoteCopyData *state);
+extern void FreeRemoteCopyOptions(RemoteCopyOptions *options);
+#endif