summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMason S2010-05-03 20:38:46 +0000
committerPavan Deolasee2011-05-19 16:38:45 +0000
commit09b657378bf282096f9b690b8d7155377aedff44 (patch)
tree3fdc27fce3b7ce7e866248a5eecfb0c0aaa058fd /src
parentf54ce7729b567f5a3d6f8183f8c09185adf337ab (diff)
Added support for COPY TO a file or STDOUT.
It currently only supports from a single table, copy with SELECT is not yet supported. This was written by Michael Paquier.
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c105
-rw-r--r--src/backend/pgxc/locator/locator.c11
-rw-r--r--src/backend/pgxc/pool/combiner.c61
-rw-r--r--src/backend/pgxc/pool/datanode.c100
-rw-r--r--src/backend/tcop/postgres.c17
-rw-r--r--src/backend/tcop/utility.c12
-rw-r--r--src/include/commands/copy.h5
-rw-r--r--src/include/pgxc/combiner.h1
-rw-r--r--src/include/pgxc/datanode.h3
-rw-r--r--src/include/pgxc/locator.h2
10 files changed, 279 insertions, 38 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 35622074f5..223e5ffa66 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -179,6 +179,7 @@ typedef struct CopyStateData
/* Locator information */
RelationLocInfo *rel_loc; /* the locator key */
int hash_idx; /* index of the hash column */
+ bool on_coord;
DataNodeHandle **connections; /* Involved data node connections */
#endif
@@ -830,7 +831,11 @@ CopyQuoteIdentifier(StringInfo query_buf, char *value)
* the table or the specifically requested columns.
*/
uint64
+#ifdef PGXC
+DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal, bool *executed)
+#else
DoCopy(const CopyStmt *stmt, const char *queryString)
+#endif
{
CopyState cstate;
bool is_from = stmt->is_from;
@@ -845,10 +850,23 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
TupleDesc tupDesc;
int num_phys_attrs;
uint64 processed;
+#ifdef PGXC
+ Exec_Nodes *exec_nodes = NULL;
+#endif
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+#ifdef PGXC
+ /*
+ * Copy to/from is initialized as being launched on datanodes
+ * This functionnality is particularly interesting to have a result for
+ * tables who have no locator informations such as pg_catalog, pg_class,
+ * and pg_attribute.
+ */
+ cstate->on_coord = false;
+#endif
+
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
@@ -1134,8 +1152,42 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
{
char *hash_att;
+ exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
+
cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel));
+ /*
+ * In case there is no locator info available, copy to/from is launched in portal on coordinator.
+ * This happens for pg_catalog tables (not user defined ones)
+ * such as pg_catalog, pg_attribute, etc.
+ * This part is launched before the portal is activated, so check a first time if there
+ * some locator data for this relid and if no, return and launch the portal.
+ */
+ if (!cstate->rel_loc && !exec_on_coord_portal)
+ {
+ /* close lock before leaving */
+ if (cstate->rel)
+ heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
+ *executed = false;
+ return 0;
+ }
+
+ if (exec_on_coord_portal)
+ cstate->on_coord = true;
+
hash_att = GetRelationHashColumn(cstate->rel_loc);
+ if (!cstate->on_coord)
+ {
+ if (is_from || hash_att)
+ exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList);
+ else
+ {
+ /*
+ * Pick up one node only
+ * This case corresponds to a replicated table with COPY TO
+ */
+ exec_nodes->nodelist = GetAnyDataNode();
+ }
+ }
cstate->hash_idx = -1;
if (hash_att)
@@ -1211,7 +1263,11 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
if (cstate->csv_mode)
appendStringInfoString(&cstate->query_buf, " CSV");
- if (cstate->header_line)
+ /*
+ * Only rewrite the header part for COPY FROM,
+ * doing that for COPY TO results in multiple headers in output
+ */
+ if (cstate->header_line && stmt->is_from)
appendStringInfoString(&cstate->query_buf, " HEADER");
if (cstate->quote && cstate->quote[0] == '"')
@@ -1410,13 +1466,21 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
{
- cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
- cstate->rel_loc->nodeList,
- GetActiveSnapshot());
- if (!cstate->connections)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_EXCEPTION),
- errmsg("Failed to initialize data nodes for COPY")));
+ /*
+ * In the case of CopyOut, it is just necessary to pick up one node randomly.
+ * This is done when rel_loc is found.
+ */
+ if (!cstate->on_coord)
+ {
+ cstate->connections = DataNodeCopyBegin(cstate->query_buf.data,
+ exec_nodes->nodelist,
+ GetActiveSnapshot(),
+ is_from);
+ if (!cstate->connections)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Failed to initialize data nodes for COPY")));
+ }
}
PG_TRY();
{
@@ -1431,7 +1495,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
}
PG_CATCH();
{
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
{
DataNodeCopyFinish(
cstate->connections,
@@ -1445,7 +1509,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
PG_RE_THROW();
}
PG_END_TRY();
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord)
{
if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED)
cstate->processed = DataNodeCopyFinish(
@@ -1488,6 +1552,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
pfree(cstate->raw_buf);
pfree(cstate);
+#ifdef PGXC
+ *executed = true;
+#endif
return processed;
}
@@ -1697,6 +1764,18 @@ CopyTo(CopyState cstate)
}
}
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !cstate->on_coord)
+ {
+ DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true),
+ cstate->connections,
+ whereToSendOutput,
+ cstate->copy_file);
+ }
+ else
+ {
+#endif
+
if (cstate->rel)
{
Datum *values;
@@ -1728,6 +1807,10 @@ CopyTo(CopyState cstate)
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
}
+#ifdef PGXC
+ }
+#endif
+
if (cstate->binary)
{
/* Generate trailer for a binary copy */
@@ -2392,7 +2475,7 @@ CopyFrom(CopyState cstate)
}
#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
+ if (IS_PGXC_COORDINATOR && !cstate->on_coord)
{
Datum *hash_value = NULL;
diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c
index 99ef5e6ee3..2ab16b5c25 100644
--- a/src/backend/pgxc/locator/locator.c
+++ b/src/backend/pgxc/locator/locator.c
@@ -82,14 +82,16 @@ init_mapping_table(int nodeCount, int mapTable[])
* Pick any data node, but try a preferred node
*
*/
-int
+List *
GetAnyDataNode(void)
{
+ List *destList = NULL;
+
/* try and pick from the preferred list */
if (globalPreferredNodes != NULL)
- return linitial_int(globalPreferredNodes);
+ return destList = lappend_int(NULL, linitial_int(globalPreferredNodes));
- return 1;
+ return destList = lappend_int(NULL, 1);
}
@@ -311,7 +313,8 @@ GetRelationNodes(RelationLocInfo * rel_loc_info, long *partValue, int isRead)
exec_nodes->primarynodelist = lappend_int(NULL, primary_data_node);
list_delete_int(exec_nodes->nodelist, primary_data_node);
}
- } else
+ }
+ else
{
if (globalPreferredNodes != NULL)
{
diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c
index ddb9482f97..8493125491 100644
--- a/src/backend/pgxc/pool/combiner.c
+++ b/src/backend/pgxc/pool/combiner.c
@@ -52,6 +52,7 @@ CreateResponseCombiner(int node_count, CombineType combine_type,
combiner->copy_out_count = 0;
combiner->inErrorState = false;
combiner->simple_aggregates = NULL;
+ combiner->copy_file = NULL;
return combiner;
}
@@ -168,6 +169,17 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
switch (msg_type)
{
+ case 'c': /* CopyOutCommandComplete */
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_OUT;
+ if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
+ /* Inconsistent responses */
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+ /* Just do nothing, close message is managed by the coordinator */
+ combiner->copy_out_count++;
+ break;
case 'C': /* CommandComplete */
/*
* If we did not receive description we are having rowcount or OK
@@ -270,12 +282,50 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Unexpected response from the data nodes")));
}
- /* Proxy first */
- if (combiner->copy_out_count++ == 0)
+ /*
+ * The normal PG code will output an H message when it runs in the
+ * coordinator, so do not proxy message here, just count it.
+ */
+ combiner->copy_out_count++;
+ break;
+ case 'd': /* CopyOutDataRow */
+ if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED)
+ combiner->request_type = REQUEST_TYPE_COPY_OUT;
+
+ /* Inconsistent responses */
+ if (combiner->request_type != REQUEST_TYPE_COPY_OUT)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Unexpected response from the data nodes")));
+
+ /* If there is a copy file, data has to be sent to the local file */
+ if (combiner->copy_file)
{
- if (combiner->dest == DestRemote
- || combiner->dest == DestRemoteExecute)
- pq_putmessage(msg_type, msg_body, len);
+ /* write data to the copy file */
+ char *data_row;
+ data_row = (char *) palloc0(len);
+ memcpy(data_row, msg_body, len);
+
+ fwrite(data_row, 1, len, combiner->copy_file);
+ break;
+ }
+ /*
+ * In this case data is sent back to the client
+ */
+ if (combiner->dest == DestRemote
+ || combiner->dest == DestRemoteExecute)
+ {
+ StringInfo data_buffer;
+
+ data_buffer = makeStringInfo();
+
+ pq_sendtext(data_buffer, msg_body, len);
+ pq_putmessage(msg_type,
+ data_buffer->data,
+ data_buffer->len);
+
+ pfree(data_buffer->data);
+ pfree(data_buffer);
}
break;
case 'D': /* DataRow */
@@ -423,6 +473,7 @@ ValidateAndResetCombiner(ResponseCombiner combiner)
combiner->copy_out_count = 0;
combiner->inErrorState = false;
combiner->simple_aggregates = NULL;
+ combiner->copy_file = NULL;
return valid;
}
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 2b997a528b..04d096625b 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -39,6 +39,13 @@
#define NO_SOCKET -1
+/*
+ * Buffer size does not affect performance significantly, just do not allow
+ * connection buffer grows infinitely
+ */
+#define COPY_BUFFER_SIZE 8192
+#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
+
static int node_count = 0;
static DataNodeHandle *handles = NULL;
static bool autocommit = true;
@@ -653,6 +660,11 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner)
{
/* try to read the message, return if not enough data */
conn->inCursor = conn->inStart;
+
+ /*
+ * Make sure we receive a complete message, otherwise, return EOF, and
+ * we will be called again after more data has been read.
+ */
if (conn->inEnd - conn->inCursor < 5)
return EOF;
@@ -673,6 +685,14 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner)
/* TODO handle other possible responses */
switch (msg_type)
{
+ case 'c': /* CopyToCommandComplete */
+ /* no need to parse, just move cursor */
+ conn->inCursor += msg_len;
+ conn->state = DN_CONNECTION_STATE_COMPLETED;
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
+ break;
case 'C': /* CommandComplete */
/* no need to parse, just move cursor */
conn->inCursor += msg_len;
@@ -680,7 +700,6 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner)
CombineResponse(combiner, msg_type,
conn->inBuffer + conn->inStart + 5,
conn->inCursor - conn->inStart - 5);
-
break;
case 'T': /* RowDescription */
case 'D': /* DataRow */
@@ -708,6 +727,16 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner)
CombineResponse(combiner, msg_type,
conn->inBuffer + conn->inStart + 5,
conn->inCursor - conn->inStart - 5);
+ conn->inStart = conn->inCursor;
+ conn->state = DN_CONNECTION_STATE_COPY_OUT;
+ return 0;
+ case 'd': /* CopyOutDataRow */
+ /* No need to parse, just send a row back to client */
+ conn->inCursor += msg_len;
+ conn->state = DN_CONNECTION_STATE_COPY_OUT;
+ CombineResponse(combiner, msg_type,
+ conn->inBuffer + conn->inStart + 5,
+ conn->inCursor - conn->inStart - 5);
break;
case 'E': /* ErrorResponse */
/* no need to parse, just move cursor */
@@ -1821,7 +1850,7 @@ DataNodeCleanAndRelease(int code, Datum arg)
* The copy_connections array must have room for NumDataNodes items
*/
DataNodeHandle**
-DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot)
+DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from)
{
int i, j;
int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist);
@@ -1883,8 +1912,12 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot)
}
if (!found)
{
- /* Add to transaction wide-list */
- write_node_list[write_node_count++] = connections[i];
+ /*
+ * Add to transaction wide-list if COPY FROM
+ * CopyOut (COPY TO) is not a write operation, no need to update
+ */
+ if (is_from)
+ write_node_list[write_node_count++] = connections[i];
/* Add to current statement list */
newConnections[new_count++] = connections[i];
}
@@ -1969,13 +2002,6 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot)
}
/*
- * Buffer size does not affect performance significantly, just do not allow
- * connection buffer grows infinitely
- */
-#define COPY_BUFFER_SIZE 8192
-#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024
-
-/*
* Send a data row to the specified nodes
*/
int
@@ -2131,6 +2157,58 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle**
return 0;
}
+int
+DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file)
+{
+ ResponseCombiner combiner;
+ int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist);
+ int count = 0;
+ bool need_tran;
+ List *nodelist;
+ ListCell *nodeitem;
+
+ nodelist = exec_nodes->nodelist;
+ need_tran = !autocommit || conn_count > 1;
+
+ combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM, dest);
+ /* If there is an existing file where to copy data, pass it to combiner */
+ if (copy_file)
+ combiner->copy_file = copy_file;
+
+ foreach(nodeitem, exec_nodes->nodelist)
+ {
+ DataNodeHandle *handle = copy_connections[count];
+ count++;
+
+ if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT)
+ {
+ int read_status = 0;
+ /* H message has been consumed, continue to manage data row messages */
+ while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */
+ {
+ if (handle_response(handle,combiner) == EOF)
+ {
+ /* read some extra-data */
+ read_status = data_node_read_data(handle);
+ if (read_status < 0)
+ return EOF;
+ }
+ /* There is no more data that can be read from connection */
+ }
+ }
+ }
+
+ if (!ValidateAndCloseCombiner(combiner))
+ {
+ if (autocommit && !PersistentConnections)
+ release_handles();
+ pfree(copy_connections);
+ return EOF;
+ }
+
+ return 0;
+}
+
/*
* Finish copy process on all connections
*/
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7bdd5b8d38..99e88ea52c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -982,14 +982,27 @@ exec_simple_query(const char *query_string)
else if (IsA(parsetree, CopyStmt))
{
CopyStmt *copy = (CopyStmt *) parsetree;
+ bool done;
/* Snapshot is needed for the Copy */
if (!snapshot_set)
{
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
- DoCopy(copy, query_string);
- exec_on_coord = false;
+ /*
+ * A check on locator is made in DoCopy to determine if the copy can be launched on
+ * Datanode or on Coordinator.
+ * If a table has no locator data, then done is set to false and copy is launched
+ * on Coordinator instead (e.g., using pg_catalog tables).
+ * If a table has some locator data (user tables), then copy was launched normally
+ * in Datanodes
+ */
+ DoCopy(copy, query_string, false, &done);
+
+ if (!done)
+ exec_on_coord = true;
+ else
+ exec_on_coord = false;
}
else
{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 28041c6305..bcdfd34125 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,6 +57,10 @@
#include "utils/guc.h"
#include "utils/syscache.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
+
/*
* Verify user has ownership of specified relation, else ereport.
@@ -576,8 +580,12 @@ ProcessUtility(Node *parsetree,
case T_CopyStmt:
{
uint64 processed;
-
- processed = DoCopy((CopyStmt *) parsetree, queryString);
+#ifdef PGXC
+ bool done;
+ processed = DoCopy((CopyStmt *) parsetree, queryString, true, &done);
+#else
+ processed = DoCopy((CopyStmt *) parsetree, queryString):
+#endif
if (completionTag)
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processed);
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index a397b78633..e6eaf5e39e 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -17,8 +17,11 @@
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
-
+#ifdef PGXC
+extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal, bool *executed);
+#else
extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
+#endif
extern DestReceiver *CreateCopyDestReceiver(void);
diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h
index 7926cbd9ff..8d8b161ac9 100644
--- a/src/include/pgxc/combiner.h
+++ b/src/include/pgxc/combiner.h
@@ -50,6 +50,7 @@ typedef struct
uint64 copy_out_count;
bool inErrorState;
List *simple_aggregates;
+ FILE *copy_file; /* used if copy_dest == COPY_FILE */
} ResponseCombinerData;
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
index 4f75ba24af..28c5d8748e 100644
--- a/src/include/pgxc/datanode.h
+++ b/src/include/pgxc/datanode.h
@@ -82,8 +82,9 @@ extern int DataNodeRollback(CommandDest dest);
extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only);
-extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot);
+extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from);
extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections);
+extern int DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file);
extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest);
extern int primary_data_node;
diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h
index 1d5280c765..335701c282 100644
--- a/src/include/pgxc/locator.h
+++ b/src/include/pgxc/locator.h
@@ -73,7 +73,7 @@ extern int GetRoundRobinNode(Oid relid);
extern bool IsHashDistributable(Oid col_type);
extern List *GetAllNodes(void);
-extern int GetAnyDataNode(void);
+extern List *GetAnyDataNode(void);
extern void RelationBuildLocator(Relation rel);
extern void FreeRelationLocInfo(RelationLocInfo * relationLocInfo);