diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/copy.c | 400 | ||||
| -rw-r--r-- | src/backend/pgxc/Makefile | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/copy/Makefile | 19 | ||||
| -rw-r--r-- | src/backend/pgxc/copy/remotecopy.c | 384 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 6 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 6 | ||||
| -rw-r--r-- | src/include/pgxc/remotecopy.h | 75 |
7 files changed, 560 insertions, 332 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 26d0f553f3..074bf09b39 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -39,6 +39,7 @@ #include "pgxc/pgxc.h" #include "pgxc/execRemote.h" #include "pgxc/locator.h" +#include "pgxc/remotecopy.h" #include "nodes/nodes.h" #include "pgxc/poolmgr.h" #include "pgxc/postgresql_fdw.h" @@ -199,24 +200,8 @@ typedef struct CopyStateData int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ #ifdef PGXC - /* - * On Coordinator we need to rewrite query. - * While client may submit a copy command dealing with file, Datanodes - * always send/receive data to/from the Coordinator. So we can not use - * original statement and should rewrite statement, specifing STDIN/STDOUT - * as copy source or destination - */ - StringInfoData query_buf; - - /* Execution nodes for COPY */ - ExecNodes *exec_nodes; - - /* Locator information */ - RelationLocInfo *rel_loc; /* the locator key */ - int idx_dist_by_col; /* index of the distributed by column */ - - PGXCNodeHandle **connections; /* Involved Datanode connections */ - TupleDesc tupDesc; /* for INSERT SELECT */ + /* Remote COPY state data */ + RemoteCopyData *remoteCopyState; #endif } CopyStateData; @@ -341,8 +326,7 @@ static bool CopyGetInt16(CopyState cstate, int16 *val); #ifdef PGXC -static ExecNodes *build_copy_statement(CopyState cstate, List *attnamelist, - TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull); +static RemoteCopyOptions *GetRemoteCopyOptions(CopyState cstate); static void append_defvals(Datum *values, CopyState cstate); #endif @@ -765,102 +749,6 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } -#ifdef PGXC -/* - * CopyQuoteStr appends quoted value to the query buffer. Value is escaped - * if needed - * - * When rewriting query to be sent down to nodes we should escape special - * characters, that may present in the value. The characters are backslash(\) - * and single quote ('). These characters are escaped by doubling. We do not - * have to escape characters like \t, \v, \b, etc. because Datanode interprets - * them properly. - * We use E'...' syntax for literals containing backslashes. - */ -static void -CopyQuoteStr(StringInfo query_buf, char *value) -{ - char *start = value; - char *current = value; - char c; - bool has_backslash = (strchr(value, '\\') != NULL); - - if (has_backslash) - appendStringInfoChar(query_buf, 'E'); - - appendStringInfoChar(query_buf, '\''); - -#define APPENDSOFAR \ - if (current > start) \ - appendBinaryStringInfo(query_buf, start, current - start) - - while ((c = *current) != '\0') - { - switch (c) - { - case '\\': - case '\'': - APPENDSOFAR; - /* Double current */ - appendStringInfoChar(query_buf, c); - /* Second current will be appended next time */ - start = current; - /* fallthru */ - default: - current++; - } - } - APPENDSOFAR; - appendStringInfoChar(query_buf, '\''); -} - -/* - * CopyQuoteIdentifier determine if identifier needs to be quoted and surround - * it with double quotes - */ -static void -CopyQuoteIdentifier(StringInfo query_buf, char *value) -{ - char *start = value; - char *current = value; - char c; - int len = strlen(value); - bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len) - || (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL); - - if (need_quote) - { - appendStringInfoChar(query_buf, '"'); - -#define APPENDSOFAR \ - if (current > start) \ - appendBinaryStringInfo(query_buf, start, current - start) - - while ((c = *current) != '\0') - { - switch (c) - { - case '"': - APPENDSOFAR; - /* Double current */ - appendStringInfoChar(query_buf, c); - /* Second current will be appended next time */ - start = current; - /* fallthru */ - default: - current++; - } - } - APPENDSOFAR; - appendStringInfoChar(query_buf, '"'); - } - else - { - appendBinaryStringInfo(query_buf, value, len); - } -} -#endif - /* * DoCopy executes the SQL COPY statement @@ -1288,9 +1176,6 @@ BeginCopy(bool is_from, TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; -#ifdef PGXC - ExecNodes *exec_nodes; -#endif /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); @@ -1329,12 +1214,25 @@ BeginCopy(bool is_from, /* Get copy statement and execution node information */ if (IS_PGXC_COORDINATOR) { - exec_nodes = build_copy_statement(cstate, - attnamelist, - tupDesc, - is_from, - cstate->force_quote, - cstate->force_notnull); + RemoteCopyData *remoteCopyState = (RemoteCopyData *) palloc0(sizeof(RemoteCopyData)); + List *attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + + /* Setup correct COPY FROM/TO flag */ + remoteCopyState->is_from = is_from; + + /* Get execution node list */ + RemoteCopy_GetRelationLoc(remoteCopyState, + cstate->rel, + attnums); + /* Build remote query */ + RemoteCopy_BuildStatement(remoteCopyState, + cstate->rel, + GetRemoteCopyOptions(cstate), + attnamelist, + attnums); + + /* Then assign built structure */ + cstate->remoteCopyState = remoteCopyState; } #endif } @@ -1500,16 +1398,18 @@ BeginCopy(bool is_from, */ if (IS_PGXC_COORDINATOR) { + RemoteCopyData *remoteCopyState = cstate->remoteCopyState; + /* * In the case of CopyOut, it is just necessary to pick up one node randomly. * This is done when rel_loc is found. */ - if (cstate->rel_loc) + if (remoteCopyState && remoteCopyState->rel_loc) { - cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, - exec_nodes->nodeList, - GetActiveSnapshot()); - if (!cstate->connections) + remoteCopyState->connections = DataNodeCopyBegin(remoteCopyState->query_buf.data, + remoteCopyState->exec_nodes->nodeList, + GetActiveSnapshot()); + if (!remoteCopyState->connections) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Failed to initialize Datanodes for COPY"))); @@ -1795,15 +1695,19 @@ CopyTo(CopyState cstate) } #ifdef PGXC - if (IS_PGXC_COORDINATOR && cstate->rel_loc) + if (IS_PGXC_COORDINATOR && + cstate->remoteCopyState && + cstate->remoteCopyState->rel_loc) { + RemoteCopyData *remoteCopyState = cstate->remoteCopyState; + /* * We don't know the value of the distribution column value, so need to * read from all nodes. Hence indicate that the value is NULL. */ processed = DataNodeCopyOut( - GetRelationNodes(cstate->rel_loc, 0, true, UNKNOWNOID, RELATION_ACCESS_READ), - cstate->connections, + GetRelationNodes(remoteCopyState->rel_loc, 0, true, UNKNOWNOID, RELATION_ACCESS_READ), + remoteCopyState->connections, cstate->copy_file); } else @@ -2228,19 +2132,19 @@ CopyFrom(CopyState cstate) * Send the data row as-is to the Datanodes. If default values * are to be inserted, append them onto the data row. */ - - if (IS_PGXC_COORDINATOR && cstate->rel_loc) + if (IS_PGXC_COORDINATOR && cstate->remoteCopyState->rel_loc) { Form_pg_attribute *attr = tupDesc->attrs; Datum dist_col_value; bool dist_col_is_null; Oid dist_col_type; + RemoteCopyData *remoteCopyState = cstate->remoteCopyState; - if (cstate->idx_dist_by_col >= 0) + if (remoteCopyState->idx_dist_by_col >= 0) { - dist_col_value = values[cstate->idx_dist_by_col]; - dist_col_is_null = nulls[cstate->idx_dist_by_col]; - dist_col_type = attr[cstate->idx_dist_by_col]->atttypid; + dist_col_value = values[remoteCopyState->idx_dist_by_col]; + dist_col_is_null = nulls[remoteCopyState->idx_dist_by_col]; + dist_col_type = attr[remoteCopyState->idx_dist_by_col]->atttypid; } else { @@ -2252,12 +2156,12 @@ CopyFrom(CopyState cstate) if (DataNodeCopyIn(cstate->line_buf.data, cstate->line_buf.len, - GetRelationNodes(cstate->rel_loc, + GetRelationNodes(remoteCopyState->rel_loc, dist_col_value, dist_col_is_null, dist_col_type, RELATION_ACCESS_INSERT), - cstate->connections)) + remoteCopyState->connections)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Copy failed on a Datanode"))); @@ -2578,6 +2482,8 @@ BeginCopyFrom(Relation rel, /* This is done at the beginning of COPY FROM from Coordinator to Datanodes */ if (IS_PGXC_COORDINATOR) { + RemoteCopyData *remoteCopyState = cstate->remoteCopyState; + /* Empty buffer info and send header to all the backends involved in COPY */ resetStringInfo(&cstate->line_buf); @@ -2594,7 +2500,7 @@ BeginCopyFrom(Relation rel, tmp = htonl(tmp); appendBinaryStringInfo(&cstate->line_buf, (char *) &tmp, 4); - if (DataNodeCopyInBinaryForAll(cstate->line_buf.data, 19, cstate->connections)) + if (DataNodeCopyInBinaryForAll(cstate->line_buf.data, 19, remoteCopyState->connections)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (COPY SEND)"))); @@ -3036,17 +2942,17 @@ void EndCopyFrom(CopyState cstate) { #ifdef PGXC + RemoteCopyData *remoteCopyState = cstate->remoteCopyState; + /* For PGXC related COPY, free also relation location data */ - if (IS_PGXC_COORDINATOR && cstate->rel_loc) + if (IS_PGXC_COORDINATOR && remoteCopyState->rel_loc) { - bool replicated = cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED; + bool replicated = remoteCopyState->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED; DataNodeCopyFinish( - cstate->connections, + remoteCopyState->connections, replicated ? PGXCNodeGetNodeId(primary_data_node, PGXC_NODE_DATANODE) : -1, replicated ? COMBINE_TYPE_SAME : COMBINE_TYPE_SUM); - pfree(cstate->connections); - pfree(cstate->query_buf.data); - FreeRelationLocInfo(cstate->rel_loc); + FreeRemoteCopyData(remoteCopyState); } #endif /* No COPY FROM related resources except memory. */ @@ -4380,186 +4286,30 @@ CreateCopyDestReceiver(void) } #ifdef PGXC -/* - * Rebuild a COPY statement in cstate and set ExecNodes - */ -static ExecNodes* -build_copy_statement(CopyState cstate, List *attnamelist, - TupleDesc tupDesc, bool is_from, List *force_quote, List *force_notnull) +static RemoteCopyOptions * +GetRemoteCopyOptions(CopyState cstate) { - char *pPartByCol; - ExecNodes *exec_nodes = makeNode(ExecNodes); - int attnum; - List *attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - - - /* - * If target table does not exists on nodes (e.g. system table) - * the location info returned is NULL. This is the criteria, when - * we need to run COPY on Coordinator - */ - cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); - - pPartByCol = GetRelationDistColumn(cstate->rel_loc); - if (cstate->rel_loc) - { - /* - * Pick up one node only - * This case corresponds to a replicated table with COPY TO - * - */ - if (!is_from && cstate->rel_loc->locatorType == 'R') - exec_nodes->nodeList = GetAnyDataNode(cstate->rel_loc->nodeList); - else - { - /* All nodes necessary */ - exec_nodes->nodeList = list_concat(exec_nodes->nodeList, cstate->rel_loc->nodeList); - } - } - - cstate->idx_dist_by_col = -1; - if (pPartByCol) - { - ListCell *cur; - foreach(cur, attnums) - { - attnum = lfirst_int(cur); - if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), pPartByCol) == 0) - { - cstate->idx_dist_by_col = attnum - 1; - break; - } - } - } - - /* - * Build up query string for the Datanodes, it should match - * to original string, but should have STDIN/STDOUT instead - * of filename. - */ - initStringInfo(&cstate->query_buf); - - appendStringInfoString(&cstate->query_buf, "COPY "); - appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); - - if (attnamelist) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " ("); - foreach (cell, attnamelist) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - - /* - * For COPY FROM, we need to append unspecified attributes that have - * default expressions associated. - */ - if (is_from) - { - for (attnum = 1; attnum <= tupDesc->natts; attnum++) - { - /* Don't let dropped attributes go into the column list */ - if (tupDesc->attrs[attnum - 1]->attisdropped) - continue; - - if (!list_member_int(attnums, attnum)) - { - /* Append only if the default expression is not shippable. */ - Expr *defexpr = (Expr*) build_column_default(cstate->rel, attnum); - if (defexpr && - !pgxc_is_expr_shippable(expression_planner(defexpr), NULL)) - { - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, - NameStr(tupDesc->attrs[attnum - 1]->attname)); - } - } - } - } - - appendStringInfoChar(&cstate->query_buf, ')'); - } - - if (is_from) - appendStringInfoString(&cstate->query_buf, " FROM STDIN"); - else - appendStringInfoString(&cstate->query_buf, " TO STDOUT"); - - - if (cstate->binary) - appendStringInfoString(&cstate->query_buf, " BINARY"); - - if (cstate->oids) - appendStringInfoString(&cstate->query_buf, " OIDS"); - + RemoteCopyOptions *res; + Assert(cstate); + res = (RemoteCopyOptions *) palloc0(sizeof(RemoteCopyOptions)); + + /* Then fill in structure */ + res->rco_binary = cstate->binary; + res->rco_oids = cstate->oids; + res->rco_csv_mode = cstate->csv_mode; if (cstate->delim) - if ((!cstate->csv_mode && cstate->delim[0] != '\t') - || (cstate->csv_mode && cstate->delim[0] != ',')) - { - appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); - CopyQuoteStr(&cstate->query_buf, cstate->delim); - } - + res->rco_delim = pstrdup(cstate->delim); if (cstate->null_print) - if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) - || (cstate->csv_mode && strcmp(cstate->null_print, ""))) - { - appendStringInfoString(&cstate->query_buf, " NULL AS "); - CopyQuoteStr(&cstate->query_buf, cstate->null_print); - } - - if (cstate->csv_mode) - appendStringInfoString(&cstate->query_buf, " CSV"); - - /* - * It is not necessary to send the HEADER part to Datanodes. - * Sending data is sufficient. - */ - - if (cstate->quote && cstate->quote[0] != '"') - { - appendStringInfoString(&cstate->query_buf, " QUOTE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->quote); - } - - if (cstate->escape && cstate->quote && cstate->escape[0] != cstate->quote[0]) - { - appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); - CopyQuoteStr(&cstate->query_buf, cstate->escape); - } - - if (force_quote) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); - foreach (cell, force_quote) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } + res->rco_null_print = pstrdup(cstate->null_print); + if (cstate->quote) + res->rco_quote = pstrdup(cstate->quote); + if (cstate->escape) + res->rco_escape = pstrdup(cstate->escape); + if (cstate->force_quote) + res->rco_force_quote = list_copy(cstate->force_quote); + if (cstate->force_notnull) + res->rco_force_notnull = list_copy(cstate->force_notnull); - if (force_notnull) - { - ListCell *cell; - ListCell *prev = NULL; - appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); - foreach (cell, force_notnull) - { - if (prev) - appendStringInfoString(&cstate->query_buf, ", "); - CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); - prev = cell; - } - } - return exec_nodes; + return res; } #endif diff --git a/src/backend/pgxc/Makefile b/src/backend/pgxc/Makefile index 7149b1680f..49a299b7c2 100644 --- a/src/backend/pgxc/Makefile +++ b/src/backend/pgxc/Makefile @@ -11,6 +11,6 @@ subdir = src/backend/pgxc top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = locator plan pool barrier nodemgr xc_maintenance_mode +SUBDIRS = locator plan pool barrier nodemgr copy xc_maintenance_mode include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/copy/Makefile b/src/backend/pgxc/copy/Makefile new file mode 100644 index 0000000000..a8cfbd86da --- /dev/null +++ b/src/backend/pgxc/copy/Makefile @@ -0,0 +1,19 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for remote copy management +# +# Copyright (c) 2010-2012 Postgres-XC Development Group +# +# IDENTIFICATION +# $PostgreSQL$ +# +#------------------------------------------------------------------------- + +subdir = src/backend/pgxc/copy +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = remotecopy.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/copy/remotecopy.c b/src/backend/pgxc/copy/remotecopy.c new file mode 100644 index 0000000000..8c3eba0bff --- /dev/null +++ b/src/backend/pgxc/copy/remotecopy.c @@ -0,0 +1,384 @@ +/*------------------------------------------------------------------------- + * + * remotecopy.c + * Implements an extension of COPY command for remote management + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 2010-2012, Postgres-XC Development Group + * + * + * IDENTIFICATION + * src/backend/pgxc/copy/remotecopy.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "lib/stringinfo.h" +#include "optimizer/planner.h" +#include "pgxc/pgxcnode.h" +#include "pgxc/postgresql_fdw.h" +#include "pgxc/remotecopy.h" +#include "rewrite/rewriteHandler.h" +#include "utils/builtins.h" +#include "utils/rel.h" + +static void RemoteCopy_QuoteStr(StringInfo query_buf, char *value); +static void RemoteCopy_QuoteIdentifier(StringInfo query_buf, char *value); + +/* + * RemoteCopy_GetRelationLoc + * Get relation node list based on COPY data involved + */ +void +RemoteCopy_GetRelationLoc(RemoteCopyData *state, + Relation rel, + List *attnums) +{ + ExecNodes *exec_nodes = makeNode(ExecNodes); + + /* + * If target table does not exists on nodes (e.g. system table) + * the location info returned is NULL. This is the criteria, when + * we need to run COPY on Coordinator + */ + state->rel_loc = GetRelationLocInfo(RelationGetRelid(rel)); + + if (state->rel_loc) + { + /* + * Pick up one node only + * This case corresponds to a replicated table with COPY TO + * + */ + if (!state->is_from && + IsLocatorReplicated(state->rel_loc->locatorType)) + exec_nodes->nodeList = GetPreferredReplicationNode(state->rel_loc->nodeList); + else + { + /* All nodes necessary */ + exec_nodes->nodeList = list_concat(exec_nodes->nodeList, state->rel_loc->nodeList); + } + } + + state->idx_dist_by_col = -1; + if (state->rel_loc->partAttrNum != 0) + { + /* + * Find the column used as key for data distribution. + * First scan attributes of tuple descriptor with the list + * of attributes used in COPY if any list is specified. + * If no list is specified, set this value to the one of + * locator information. + */ + if (attnums != NIL) + { + ListCell *cur; + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + + if (state->rel_loc->partAttrNum == attnum) + { + state->idx_dist_by_col = attnum - 1; + break; + } + } + } + else + { + state->idx_dist_by_col = state->rel_loc->partAttrNum - 1; + } + } + + /* Then save obtained result */ + state->exec_nodes = exec_nodes; +} + +/* + * RemoteCopy_BuildStatement + * Build a COPY query for remote management + */ +void +RemoteCopy_BuildStatement(RemoteCopyData *state, + Relation rel, + RemoteCopyOptions *options, + List *attnamelist, + List *attnums) +{ + int attnum; + TupleDesc tupDesc = RelationGetDescr(rel); + + /* + * Build up query string for the Datanodes, it should match + * to original string, but should have STDIN/STDOUT instead + * of filename. + */ + initStringInfo(&state->query_buf); + appendStringInfoString(&state->query_buf, "COPY "); + appendStringInfo(&state->query_buf, "%s", RelationGetRelationName(rel)); + + if (attnamelist) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&state->query_buf, " ("); + foreach (cell, attnamelist) + { + if (prev) + appendStringInfoString(&state->query_buf, ", "); + RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell))); + prev = cell; + } + + /* + * For COPY FROM, we need to append unspecified attributes that have + * default expressions associated. + */ + if (state->is_from) + { + for (attnum = 1; attnum <= tupDesc->natts; attnum++) + { + /* Don't let dropped attributes go into the column list */ + if (tupDesc->attrs[attnum - 1]->attisdropped) + continue; + + if (!list_member_int(attnums, attnum)) + { + /* Append only if the default expression is not shippable. */ + Expr *defexpr = (Expr*) build_column_default(rel, attnum); + if (defexpr && + !pgxc_is_expr_shippable(expression_planner(defexpr), NULL)) + { + appendStringInfoString(&state->query_buf, ", "); + RemoteCopy_QuoteIdentifier(&state->query_buf, + NameStr(tupDesc->attrs[attnum - 1]->attname)); + } + } + } + } + + appendStringInfoChar(&state->query_buf, ')'); + } + + if (state->is_from) + appendStringInfoString(&state->query_buf, " FROM STDIN"); + else + appendStringInfoString(&state->query_buf, " TO STDOUT"); + + + if (options->rco_binary) + appendStringInfoString(&state->query_buf, " BINARY"); + + if (options->rco_oids) + appendStringInfoString(&state->query_buf, " OIDS"); + + if (options->rco_delim) + { + if ((!options->rco_csv_mode && options->rco_delim[0] != '\t') + || (options->rco_csv_mode && options->rco_delim[0] != ',')) + { + appendStringInfoString(&state->query_buf, " DELIMITER AS "); + RemoteCopy_QuoteStr(&state->query_buf, options->rco_delim); + } + } + + if (options->rco_null_print) + { + if ((!options->rco_csv_mode && strcmp(options->rco_null_print, "\\N")) + || (options->rco_csv_mode && strcmp(options->rco_null_print, ""))) + { + appendStringInfoString(&state->query_buf, " NULL AS "); + RemoteCopy_QuoteStr(&state->query_buf, options->rco_null_print); + } + } + + if (options->rco_csv_mode) + appendStringInfoString(&state->query_buf, " CSV"); + + /* + * It is not necessary to send the HEADER part to Datanodes. + * Sending data is sufficient. + */ + + if (options->rco_quote && options->rco_quote[0] != '"') + { + appendStringInfoString(&state->query_buf, " QUOTE AS "); + RemoteCopy_QuoteStr(&state->query_buf, options->rco_quote); + } + + if (options->rco_escape && options->rco_quote && options->rco_escape[0] != options->rco_quote[0]) + { + appendStringInfoString(&state->query_buf, " ESCAPE AS "); + RemoteCopy_QuoteStr(&state->query_buf, options->rco_escape); + } + + if (options->rco_force_quote) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&state->query_buf, " FORCE QUOTE "); + foreach (cell, options->rco_force_quote) + { + if (prev) + appendStringInfoString(&state->query_buf, ", "); + RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + + if (options->rco_force_notnull) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&state->query_buf, " FORCE NOT NULL "); + foreach (cell, options->rco_force_notnull) + { + if (prev) + appendStringInfoString(&state->query_buf, ", "); + RemoteCopy_QuoteIdentifier(&state->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } +} + + +/* + * FreeRemoteCopyOptions + * Free remote COPY options structure + */ +void +FreeRemoteCopyOptions(RemoteCopyOptions *options) +{ + /* Leave if nothing */ + if (options == NULL) + return; + + /* Free field by field */ + if (options->rco_delim) + pfree(options->rco_delim); + if (options->rco_null_print) + pfree(options->rco_null_print); + if (options->rco_quote) + pfree(options->rco_quote); + if (options->rco_escape) + pfree(options->rco_escape); + if (options->rco_force_quote) + list_free(options->rco_force_quote); + if (options->rco_force_notnull) + list_free(options->rco_force_notnull); + + /* Then finish the work */ + pfree(options); +} + + +/* + * FreeRemoteCopyData + * Free remote COPY state data structure + */ +void +FreeRemoteCopyData(RemoteCopyData *state) +{ + /* Leave if nothing */ + if (state == NULL) + return; + + if (state->connections) + pfree(state->connections); + if (state->query_buf.data) + pfree(state->query_buf.data); + FreeRelationLocInfo(state->rel_loc); + pfree(state); +} + + +#define APPENDSOFAR(query_buf, start, current) \ + if (current > start) \ + appendBinaryStringInfo(query_buf, start, current - start) + +/* + * RemoteCopy_QuoteStr + * Append quoted value to the query buffer. Value is escaped if needed + * When rewriting query to be sent down to nodes we should escape special + * characters, that may present in the value. The characters are backslash(\) + * and single quote ('). These characters are escaped by doubling. We do not + * have to escape characters like \t, \v, \b, etc. because Datanode interprets + * them properly. + * We use E'...' syntax for literals containing backslashes. + */ +static void +RemoteCopy_QuoteStr(StringInfo query_buf, char *value) +{ + char *start = value; + char *current = value; + char c; + bool has_backslash = (strchr(value, '\\') != NULL); + + if (has_backslash) + appendStringInfoChar(query_buf, 'E'); + + appendStringInfoChar(query_buf, '\''); + + while ((c = *current) != '\0') + { + switch (c) + { + case '\\': + case '\'': + APPENDSOFAR(query_buf, start, current); + /* Double current */ + appendStringInfoChar(query_buf, c); + /* Second current will be appended next time */ + start = current; + /* fallthru */ + default: + current++; + } + } + APPENDSOFAR(query_buf, start, current); + appendStringInfoChar(query_buf, '\''); +} + +/* + * RemoteCopy_QuoteIdentifier + * Determine if identifier needs to be quoted and surround it with double quotes + */ +static void +RemoteCopy_QuoteIdentifier(StringInfo query_buf, char *value) +{ + char *start = value; + char *current = value; + char c; + int len = strlen(value); + bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len) || + (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL); + + if (need_quote) + { + appendStringInfoChar(query_buf, '"'); + + while ((c = *current) != '\0') + { + switch (c) + { + case '"': + APPENDSOFAR(query_buf, start, current); + /* Double current */ + appendStringInfoChar(query_buf, c); + /* Second current will be appended next time */ + start = current; + /* fallthru */ + default: + current++; + } + } + APPENDSOFAR(query_buf, start, current); + appendStringInfoChar(query_buf, '"'); + } + else + { + appendBinaryStringInfo(query_buf, value, len); + } +} diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 905369e688..feab0a1f9e 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -121,11 +121,11 @@ static const unsigned int xc_mod_r[][6] = }; /* - * GetAnyDataNode - * Pick any Datanode from given list, but try a preferred node + * GetPreferredReplicationNode + * Pick any Datanode from given list, however fetch a preferred node first. */ List * -GetAnyDataNode(List *relNodes) +GetPreferredReplicationNode(List *relNodes) { /* * Try to find the first node in given list relNodes diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index 555be50f5f..bd719911ea 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -111,15 +111,15 @@ extern int GetRoundRobinNode(Oid relid); extern bool IsTypeHashDistributable(Oid col_type); extern List *GetAllDataNodes(void); extern List *GetAllCoordNodes(void); -extern List *GetAnyDataNode(List *relNodes); +extern List *GetPreferredReplicationNode(List *relNodes); extern void RelationBuildLocator(Relation rel); extern void FreeRelationLocInfo(RelationLocInfo *relationLocInfo); extern bool IsTypeModuloDistributable(Oid col_type); -extern char *GetRelationModuloColumn(RelationLocInfo * rel_loc_info); +extern char *GetRelationModuloColumn(RelationLocInfo *rel_loc_info); extern bool IsModuloColumn(RelationLocInfo *rel_loc_info, char *part_col_name); extern bool IsModuloColumnForRelId(Oid relid, char *part_col_name); -extern char *GetRelationDistColumn(RelationLocInfo * rel_loc_info); +extern char *GetRelationDistColumn(RelationLocInfo *rel_loc_info); extern bool IsDistColumnForRelId(Oid relid, char *part_col_name); extern void FreeExecNodes(ExecNodes **exec_nodes); diff --git a/src/include/pgxc/remotecopy.h b/src/include/pgxc/remotecopy.h new file mode 100644 index 0000000000..77134e71f9 --- /dev/null +++ b/src/include/pgxc/remotecopy.h @@ -0,0 +1,75 @@ +/*------------------------------------------------------------------------- + * + * remotecopy.h + * Routines for extension of COPY command for cluster management + * + * + * Copyright (c) 2010-2012 Postgres-XC Development Group + * + * + * IDENTIFICATION + * src/include/pgxc/remotecopy.h + * + *------------------------------------------------------------------------- + */ +#ifndef REMOTECOPY_H +#define REMOTECOPY_H + +#include "nodes/parsenodes.h" + +/* + * This contains the set of data necessary for remote COPY control. + */ +typedef struct RemoteCopyData { + /* COPY FROM/TO? */ + bool is_from; + + /* + * On Coordinator we need to rewrite query. + * While client may submit a copy command dealing with file, Datanodes + * always send/receive data to/from the Coordinator. So we can not use + * original statement and should rewrite statement, specifing STDIN/STDOUT + * as copy source or destination + */ + StringInfoData query_buf; + + /* Execution nodes for COPY */ + ExecNodes *exec_nodes; + + /* Locator information */ + RelationLocInfo *rel_loc; /* the locator key */ + int idx_dist_by_col; /* index of the distributed by column */ + + PGXCNodeHandle **connections; /* Involved Datanode connections */ +} RemoteCopyData; + +/* + * List of all the options used for query deparse step + * As CopyStateData stays private in copy.c and in order not to + * make Postgres-XC code too much intrusive in PostgreSQL code, + * this intermediate structure is used primarily to generate remote + * COPY queries based on deparsed options. + */ +typedef struct RemoteCopyOptions { + bool rco_binary; /* binary format? */ + bool rco_oids; /* include OIDs? */ + bool rco_csv_mode; /* Comma Separated Value format? */ + char *rco_delim; /* column delimiter (must be 1 byte) */ + char *rco_null_print; /* NULL marker string (server encoding!) */ + char *rco_quote; /* CSV quote char (must be 1 byte) */ + char *rco_escape; /* CSV escape char (must be 1 byte) */ + List *rco_force_quote; /* list of column names */ + List *rco_force_notnull; /* list of column names */ +} RemoteCopyOptions; + +extern void RemoteCopy_BuildStatement(RemoteCopyData *state, + Relation rel, + RemoteCopyOptions *options, + List *attnamelist, + List *attnums); +extern void RemoteCopy_GetRelationLoc(RemoteCopyData *state, + Relation rel, + List *attnums); +extern void FreeRemoteCopyData(RemoteCopyData *state); +extern void FreeRemoteCopyOptions(RemoteCopyOptions *options); +#endif |
