diff options
| author | Mason S | 2010-05-03 20:38:46 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:38:45 +0000 |
| commit | 09b657378bf282096f9b690b8d7155377aedff44 (patch) | |
| tree | 3fdc27fce3b7ce7e866248a5eecfb0c0aaa058fd /src | |
| parent | f54ce7729b567f5a3d6f8183f8c09185adf337ab (diff) | |
Added support for COPY TO a file or STDOUT.
It currently only supports from a single table, copy with SELECT is not
yet supported.
This was written by Michael Paquier.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/copy.c | 105 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 11 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/combiner.c | 61 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 100 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 17 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 12 | ||||
| -rw-r--r-- | src/include/commands/copy.h | 5 | ||||
| -rw-r--r-- | src/include/pgxc/combiner.h | 1 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 3 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 2 |
10 files changed, 279 insertions, 38 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 35622074f5..223e5ffa66 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -179,6 +179,7 @@ typedef struct CopyStateData /* Locator information */ RelationLocInfo *rel_loc; /* the locator key */ int hash_idx; /* index of the hash column */ + bool on_coord; DataNodeHandle **connections; /* Involved data node connections */ #endif @@ -830,7 +831,11 @@ CopyQuoteIdentifier(StringInfo query_buf, char *value) * the table or the specifically requested columns. */ uint64 +#ifdef PGXC +DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal, bool *executed) +#else DoCopy(const CopyStmt *stmt, const char *queryString) +#endif { CopyState cstate; bool is_from = stmt->is_from; @@ -845,10 +850,23 @@ DoCopy(const CopyStmt *stmt, const char *queryString) TupleDesc tupDesc; int num_phys_attrs; uint64 processed; +#ifdef PGXC + Exec_Nodes *exec_nodes = NULL; +#endif /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); +#ifdef PGXC + /* + * Copy to/from is initialized as being launched on datanodes + * This functionnality is particularly interesting to have a result for + * tables who have no locator informations such as pg_catalog, pg_class, + * and pg_attribute. + */ + cstate->on_coord = false; +#endif + /* Extract options from the statement node tree */ foreach(option, stmt->options) { @@ -1134,8 +1152,42 @@ DoCopy(const CopyStmt *stmt, const char *queryString) { char *hash_att; + exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); + /* + * In case there is no locator info available, copy to/from is launched in portal on coordinator. + * This happens for pg_catalog tables (not user defined ones) + * such as pg_catalog, pg_attribute, etc. + * This part is launched before the portal is activated, so check a first time if there + * some locator data for this relid and if no, return and launch the portal. + */ + if (!cstate->rel_loc && !exec_on_coord_portal) + { + /* close lock before leaving */ + if (cstate->rel) + heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); + *executed = false; + return 0; + } + + if (exec_on_coord_portal) + cstate->on_coord = true; + hash_att = GetRelationHashColumn(cstate->rel_loc); + if (!cstate->on_coord) + { + if (is_from || hash_att) + exec_nodes->nodelist = list_copy(cstate->rel_loc->nodeList); + else + { + /* + * Pick up one node only + * This case corresponds to a replicated table with COPY TO + */ + exec_nodes->nodelist = GetAnyDataNode(); + } + } cstate->hash_idx = -1; if (hash_att) @@ -1211,7 +1263,11 @@ DoCopy(const CopyStmt *stmt, const char *queryString) if (cstate->csv_mode) appendStringInfoString(&cstate->query_buf, " CSV"); - if (cstate->header_line) + /* + * Only rewrite the header part for COPY FROM, + * doing that for COPY TO results in multiple headers in output + */ + if (cstate->header_line && stmt->is_from) appendStringInfoString(&cstate->query_buf, " HEADER"); if (cstate->quote && cstate->quote[0] == '"') @@ -1410,13 +1466,21 @@ DoCopy(const CopyStmt *stmt, const char *queryString) #ifdef PGXC if (IS_PGXC_COORDINATOR) { - cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, - cstate->rel_loc->nodeList, - GetActiveSnapshot()); - if (!cstate->connections) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), - errmsg("Failed to initialize data nodes for COPY"))); + /* + * In the case of CopyOut, it is just necessary to pick up one node randomly. + * This is done when rel_loc is found. + */ + if (!cstate->on_coord) + { + cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, + exec_nodes->nodelist, + GetActiveSnapshot(), + is_from); + if (!cstate->connections) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Failed to initialize data nodes for COPY"))); + } } PG_TRY(); { @@ -1431,7 +1495,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) } PG_CATCH(); { - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord) { DataNodeCopyFinish( cstate->connections, @@ -1445,7 +1509,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) PG_RE_THROW(); } PG_END_TRY(); - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && is_from && !cstate->on_coord) { if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED) cstate->processed = DataNodeCopyFinish( @@ -1488,6 +1552,9 @@ DoCopy(const CopyStmt *stmt, const char *queryString) pfree(cstate->raw_buf); pfree(cstate); +#ifdef PGXC + *executed = true; +#endif return processed; } @@ -1697,6 +1764,18 @@ CopyTo(CopyState cstate) } } +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !cstate->on_coord) + { + DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true), + cstate->connections, + whereToSendOutput, + cstate->copy_file); + } + else + { +#endif + if (cstate->rel) { Datum *values; @@ -1728,6 +1807,10 @@ CopyTo(CopyState cstate) ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); } +#ifdef PGXC + } +#endif + if (cstate->binary) { /* Generate trailer for a binary copy */ @@ -2392,7 +2475,7 @@ CopyFrom(CopyState cstate) } #ifdef PGXC - if (IS_PGXC_COORDINATOR) + if (IS_PGXC_COORDINATOR && !cstate->on_coord) { Datum *hash_value = NULL; diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 99ef5e6ee3..2ab16b5c25 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -82,14 +82,16 @@ init_mapping_table(int nodeCount, int mapTable[]) * Pick any data node, but try a preferred node * */ -int +List * GetAnyDataNode(void) { + List *destList = NULL; + /* try and pick from the preferred list */ if (globalPreferredNodes != NULL) - return linitial_int(globalPreferredNodes); + return destList = lappend_int(NULL, linitial_int(globalPreferredNodes)); - return 1; + return destList = lappend_int(NULL, 1); } @@ -311,7 +313,8 @@ GetRelationNodes(RelationLocInfo * rel_loc_info, long *partValue, int isRead) exec_nodes->primarynodelist = lappend_int(NULL, primary_data_node); list_delete_int(exec_nodes->nodelist, primary_data_node); } - } else + } + else { if (globalPreferredNodes != NULL) { diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c index ddb9482f97..8493125491 100644 --- a/src/backend/pgxc/pool/combiner.c +++ b/src/backend/pgxc/pool/combiner.c @@ -52,6 +52,7 @@ CreateResponseCombiner(int node_count, CombineType combine_type, combiner->copy_out_count = 0; combiner->inErrorState = false; combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; return combiner; } @@ -168,6 +169,17 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t switch (msg_type) { + case 'c': /* CopyOutCommandComplete */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + /* Inconsistent responses */ + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + /* Just do nothing, close message is managed by the coordinator */ + combiner->copy_out_count++; + break; case 'C': /* CommandComplete */ /* * If we did not receive description we are having rowcount or OK @@ -270,12 +282,50 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t (errcode(ERRCODE_DATA_CORRUPTED), errmsg("Unexpected response from the data nodes"))); } - /* Proxy first */ - if (combiner->copy_out_count++ == 0) + /* + * The normal PG code will output an H message when it runs in the + * coordinator, so do not proxy message here, just count it. + */ + combiner->copy_out_count++; + break; + case 'd': /* CopyOutDataRow */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + combiner->request_type = REQUEST_TYPE_COPY_OUT; + + /* Inconsistent responses */ + if (combiner->request_type != REQUEST_TYPE_COPY_OUT) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); + + /* If there is a copy file, data has to be sent to the local file */ + if (combiner->copy_file) { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); + /* write data to the copy file */ + char *data_row; + data_row = (char *) palloc0(len); + memcpy(data_row, msg_body, len); + + fwrite(data_row, 1, len, combiner->copy_file); + break; + } + /* + * In this case data is sent back to the client + */ + if (combiner->dest == DestRemote + || combiner->dest == DestRemoteExecute) + { + StringInfo data_buffer; + + data_buffer = makeStringInfo(); + + pq_sendtext(data_buffer, msg_body, len); + pq_putmessage(msg_type, + data_buffer->data, + data_buffer->len); + + pfree(data_buffer->data); + pfree(data_buffer); } break; case 'D': /* DataRow */ @@ -423,6 +473,7 @@ ValidateAndResetCombiner(ResponseCombiner combiner) combiner->copy_out_count = 0; combiner->inErrorState = false; combiner->simple_aggregates = NULL; + combiner->copy_file = NULL; return valid; } diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 2b997a528b..04d096625b 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -39,6 +39,13 @@ #define NO_SOCKET -1 +/* + * Buffer size does not affect performance significantly, just do not allow + * connection buffer grows infinitely + */ +#define COPY_BUFFER_SIZE 8192 +#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 + static int node_count = 0; static DataNodeHandle *handles = NULL; static bool autocommit = true; @@ -653,6 +660,11 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner) { /* try to read the message, return if not enough data */ conn->inCursor = conn->inStart; + + /* + * Make sure we receive a complete message, otherwise, return EOF, and + * we will be called again after more data has been read. + */ if (conn->inEnd - conn->inCursor < 5) return EOF; @@ -673,6 +685,14 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner) /* TODO handle other possible responses */ switch (msg_type) { + case 'c': /* CopyToCommandComplete */ + /* no need to parse, just move cursor */ + conn->inCursor += msg_len; + conn->state = DN_CONNECTION_STATE_COMPLETED; + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); + break; case 'C': /* CommandComplete */ /* no need to parse, just move cursor */ conn->inCursor += msg_len; @@ -680,7 +700,6 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner) CombineResponse(combiner, msg_type, conn->inBuffer + conn->inStart + 5, conn->inCursor - conn->inStart - 5); - break; case 'T': /* RowDescription */ case 'D': /* DataRow */ @@ -708,6 +727,16 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner) CombineResponse(combiner, msg_type, conn->inBuffer + conn->inStart + 5, conn->inCursor - conn->inStart - 5); + conn->inStart = conn->inCursor; + conn->state = DN_CONNECTION_STATE_COPY_OUT; + return 0; + case 'd': /* CopyOutDataRow */ + /* No need to parse, just send a row back to client */ + conn->inCursor += msg_len; + conn->state = DN_CONNECTION_STATE_COPY_OUT; + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); break; case 'E': /* ErrorResponse */ /* no need to parse, just move cursor */ @@ -1821,7 +1850,7 @@ DataNodeCleanAndRelease(int code, Datum arg) * The copy_connections array must have room for NumDataNodes items */ DataNodeHandle** -DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot) +DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from) { int i, j; int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); @@ -1883,8 +1912,12 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot) } if (!found) { - /* Add to transaction wide-list */ - write_node_list[write_node_count++] = connections[i]; + /* + * Add to transaction wide-list if COPY FROM + * CopyOut (COPY TO) is not a write operation, no need to update + */ + if (is_from) + write_node_list[write_node_count++] = connections[i]; /* Add to current statement list */ newConnections[new_count++] = connections[i]; } @@ -1969,13 +2002,6 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot) } /* - * Buffer size does not affect performance significantly, just do not allow - * connection buffer grows infinitely - */ -#define COPY_BUFFER_SIZE 8192 -#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 - -/* * Send a data row to the specified nodes */ int @@ -2131,6 +2157,58 @@ DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** return 0; } +int +DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file) +{ + ResponseCombiner combiner; + int conn_count = list_length(exec_nodes->nodelist) == 0 ? NumDataNodes : list_length(exec_nodes->nodelist); + int count = 0; + bool need_tran; + List *nodelist; + ListCell *nodeitem; + + nodelist = exec_nodes->nodelist; + need_tran = !autocommit || conn_count > 1; + + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_SUM, dest); + /* If there is an existing file where to copy data, pass it to combiner */ + if (copy_file) + combiner->copy_file = copy_file; + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[count]; + count++; + + if (handle && handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + int read_status = 0; + /* H message has been consumed, continue to manage data row messages */ + while (read_status >= 0 && handle->state == DN_CONNECTION_STATE_COPY_OUT) /* continue to read as long as there is data */ + { + if (handle_response(handle,combiner) == EOF) + { + /* read some extra-data */ + read_status = data_node_read_data(handle); + if (read_status < 0) + return EOF; + } + /* There is no more data that can be read from connection */ + } + } + } + + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit && !PersistentConnections) + release_handles(); + pfree(copy_connections); + return EOF; + } + + return 0; +} + /* * Finish copy process on all connections */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7bdd5b8d38..99e88ea52c 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -982,14 +982,27 @@ exec_simple_query(const char *query_string) else if (IsA(parsetree, CopyStmt)) { CopyStmt *copy = (CopyStmt *) parsetree; + bool done; /* Snapshot is needed for the Copy */ if (!snapshot_set) { PushActiveSnapshot(GetTransactionSnapshot()); snapshot_set = true; } - DoCopy(copy, query_string); - exec_on_coord = false; + /* + * A check on locator is made in DoCopy to determine if the copy can be launched on + * Datanode or on Coordinator. + * If a table has no locator data, then done is set to false and copy is launched + * on Coordinator instead (e.g., using pg_catalog tables). + * If a table has some locator data (user tables), then copy was launched normally + * in Datanodes + */ + DoCopy(copy, query_string, false, &done); + + if (!done) + exec_on_coord = true; + else + exec_on_coord = false; } else { diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 28041c6305..bcdfd34125 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -57,6 +57,10 @@ #include "utils/guc.h" #include "utils/syscache.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif + /* * Verify user has ownership of specified relation, else ereport. @@ -576,8 +580,12 @@ ProcessUtility(Node *parsetree, case T_CopyStmt: { uint64 processed; - - processed = DoCopy((CopyStmt *) parsetree, queryString); +#ifdef PGXC + bool done; + processed = DoCopy((CopyStmt *) parsetree, queryString, true, &done); +#else + processed = DoCopy((CopyStmt *) parsetree, queryString): +#endif if (completionTag) snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, processed); diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index a397b78633..e6eaf5e39e 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -17,8 +17,11 @@ #include "nodes/parsenodes.h" #include "tcop/dest.h" - +#ifdef PGXC +extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString, bool exec_on_coord_portal, bool *executed); +#else extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); +#endif extern DestReceiver *CreateCopyDestReceiver(void); diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h index 7926cbd9ff..8d8b161ac9 100644 --- a/src/include/pgxc/combiner.h +++ b/src/include/pgxc/combiner.h @@ -50,6 +50,7 @@ typedef struct uint64 copy_out_count; bool inErrorState; List *simple_aggregates; + FILE *copy_file; /* used if copy_dest == COPY_FILE */ } ResponseCombinerData; diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index 4f75ba24af..28c5d8748e 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -82,8 +82,9 @@ extern int DataNodeRollback(CommandDest dest); extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only); -extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot); +extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_from); extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); +extern int DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, CommandDest dest, FILE* copy_file); extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest); extern int primary_data_node; diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index 1d5280c765..335701c282 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -73,7 +73,7 @@ extern int GetRoundRobinNode(Oid relid); extern bool IsHashDistributable(Oid col_type); extern List *GetAllNodes(void); -extern int GetAnyDataNode(void); +extern List *GetAnyDataNode(void); extern void RelationBuildLocator(Relation rel); extern void FreeRelationLocInfo(RelationLocInfo * relationLocInfo); |
