diff options
| author | Mason S | 2010-04-18 04:43:57 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2011-05-19 16:38:45 +0000 |
| commit | f54ce7729b567f5a3d6f8183f8c09185adf337ab (patch) | |
| tree | a1755765e411e1ddebe55646240d3b1ad713c426 | |
| parent | 8f4735f6b2b1676a49af3b1c184475831853a5a3 (diff) | |
Added support for COPY FROM, for loading tables.
Some additional work was done related to the combiner and
error handling to make this code a little cleaner.
This was written by Andrei Martsinchyk.
| -rw-r--r-- | src/backend/commands/copy.c | 330 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/locator.c | 1 | ||||
| -rw-r--r-- | src/backend/pgxc/plan/planner.c | 61 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/combiner.c | 103 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 749 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 91 | ||||
| -rw-r--r-- | src/include/pgxc/combiner.h | 4 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 16 | ||||
| -rw-r--r-- | src/include/pgxc/locator.h | 1 | ||||
| -rw-r--r-- | src/include/pgxc/planner.h | 2 |
10 files changed, 1107 insertions, 251 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index c464ed7f6c..35622074f5 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -33,6 +33,12 @@ #include "miscadmin.h" #include "optimizer/planner.h" #include "parser/parse_relation.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/datanode.h" +#include "pgxc/locator.h" +#include "pgxc/poolmgr.h" +#endif #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -160,6 +166,22 @@ typedef struct CopyStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ +#ifdef PGXC + /* + * On coordinator we need to rewrite query. + * While client may submit a copy command dealing with file, data nodes + * always send/receive data to/from the coordinator. So we can not use + * original statement and should rewrite statement, specifing STDIN/STDOUT + * as copy source or destination + */ + StringInfoData query_buf; + + /* Locator information */ + RelationLocInfo *rel_loc; /* the locator key */ + int hash_idx; /* index of the hash column */ + + DataNodeHandle **connections; /* Involved data node connections */ +#endif } CopyStateData; typedef CopyStateData *CopyState; @@ -646,7 +668,6 @@ CopyGetInt16(CopyState cstate, int16 *val) return true; } - /* * CopyLoadRawBuf loads some more data into raw_buf * @@ -682,6 +703,101 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +#ifdef PGXC +/* + * CopyQuoteStr appends quoted value to the query buffer. Value is escaped + * if needed + * + * When rewriting query to be sent down to nodes we should escape special + * characters, that may present in the value. The characters are backslash(\) + * and single quote ('). These characters are escaped by doubling. We do not + * have to escape characters like \t, \v, \b, etc. because datanode interprets + * them properly. + * We use E'...' syntax for litherals containing backslashes. + */ +static void +CopyQuoteStr(StringInfo query_buf, char *value) +{ + char *start = value; + char *current = value; + char c; + bool has_backslash = (strchr(value, '\\') != NULL); + + if (has_backslash) + appendStringInfoChar(query_buf, 'E'); + + appendStringInfoChar(query_buf, '\''); + +#define APPENDSOFAR \ + if (current > start) \ + appendBinaryStringInfo(query_buf, start, current - start) + + while ((c = *current) != '\0') + { + switch (c) + { + case '\\': + case '\'': + APPENDSOFAR; + // Double current + appendStringInfoChar(query_buf, c); + // Second current will be appended next time + start = current; + // fallthru + default: + current++; + } + } + APPENDSOFAR; + appendStringInfoChar(query_buf, '\''); +} + +/* + * CopyQuoteIdentifier determine if identifier needs to be quoted and surround + * it with double quotes + */ +static void +CopyQuoteIdentifier(StringInfo query_buf, char *value) +{ + char *start = value; + char *current = value; + char c; + int len = strlen(value); + bool need_quote = (strspn(value, "_abcdefghijklmnopqrstuvwxyz0123456789") < len) + || (strchr("_abcdefghijklmnopqrstuvwxyz", value[0]) == NULL); + + if (need_quote) + { + appendStringInfoChar(query_buf, '"'); + +#define APPENDSOFAR \ + if (current > start) \ + appendBinaryStringInfo(query_buf, start, current - start) + + while ((c = *current) != '\0') + { + switch (c) + { + case '"': + APPENDSOFAR; + // Double current + appendStringInfoChar(query_buf, c); + // Second current will be appended next time + start = current; + // fallthru + default: + current++; + } + } + APPENDSOFAR; + appendStringInfoChar(query_buf, '"'); + } + else + { + appendBinaryStringInfo(query_buf, value, len); + } +} +#endif /* * DoCopy executes the SQL COPY statement @@ -1012,6 +1128,133 @@ DoCopy(const CopyStmt *stmt, const char *queryString) (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("table \"%s\" does not have OIDs", RelationGetRelationName(cstate->rel)))); +#ifdef PGXC + /* Get locator information */ + if (IS_PGXC_COORDINATOR) + { + char *hash_att; + + cstate->rel_loc = GetRelationLocInfo(RelationGetRelid(cstate->rel)); + hash_att = GetRelationHashColumn(cstate->rel_loc); + + cstate->hash_idx = -1; + if (hash_att) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + if (namestrcmp(&(tupDesc->attrs[attnum - 1]->attname), hash_att) == 0) + { + cstate->hash_idx = attnum - 1; + break; + } + } + } + + /* + * Build up query string for the data nodes, it should match + * to original string, but should have STDIN/STDOUT instead + * of filename. + */ + initStringInfo(&cstate->query_buf); + + appendStringInfoString(&cstate->query_buf, "COPY "); + appendStringInfo(&cstate->query_buf, "%s", RelationGetRelationName(cstate->rel)); + + if (attnamelist) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " ("); + foreach (cell, attnamelist) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + appendStringInfoChar(&cstate->query_buf, ')'); + } + + if (stmt->is_from) + appendStringInfoString(&cstate->query_buf, " FROM STDIN"); + else + appendStringInfoString(&cstate->query_buf, " TO STDOUT"); + + + if (cstate->binary) + appendStringInfoString(&cstate->query_buf, " BINARY"); + + if (cstate->oids) + appendStringInfoString(&cstate->query_buf, " OIDS"); + + if (cstate->delim) + if ((!cstate->csv_mode && cstate->delim[0] != '\t') + || (cstate->csv_mode && cstate->delim[0] != ',')) + { + appendStringInfoString(&cstate->query_buf, " DELIMITER AS "); + CopyQuoteStr(&cstate->query_buf, cstate->delim); + } + + if (cstate->null_print) + if ((!cstate->csv_mode && strcmp(cstate->null_print, "\\N")) + || (cstate->csv_mode && strcmp(cstate->null_print, ""))) + { + appendStringInfoString(&cstate->query_buf, " NULL AS "); + CopyQuoteStr(&cstate->query_buf, cstate->null_print); + } + + if (cstate->csv_mode) + appendStringInfoString(&cstate->query_buf, " CSV"); + + if (cstate->header_line) + appendStringInfoString(&cstate->query_buf, " HEADER"); + + if (cstate->quote && cstate->quote[0] == '"') + { + appendStringInfoString(&cstate->query_buf, " QUOTE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->quote); + } + + if (cstate->escape && cstate->quote && cstate->escape[0] == cstate->quote[0]) + { + appendStringInfoString(&cstate->query_buf, " ESCAPE AS "); + CopyQuoteStr(&cstate->query_buf, cstate->escape); + } + + if (force_quote) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE QUOTE "); + foreach (cell, force_quote) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + + if (force_notnull) + { + ListCell *cell; + ListCell *prev = NULL; + appendStringInfoString(&cstate->query_buf, " FORCE NOT NULL "); + foreach (cell, force_notnull) + { + if (prev) + appendStringInfoString(&cstate->query_buf, ", "); + CopyQuoteIdentifier(&cstate->query_buf, strVal(lfirst(cell))); + prev = cell; + } + } + } +#endif } else { @@ -1023,6 +1266,13 @@ DoCopy(const CopyStmt *stmt, const char *queryString) Assert(!is_from); cstate->rel = NULL; +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY (SELECT) is not supported in PGXC"))); +#endif + /* Don't allow COPY w/ OIDs from a select */ if (cstate->oids) ereport(ERROR, @@ -1157,11 +1407,64 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->copy_dest = COPY_FILE; /* default */ cstate->filename = stmt->filename; +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + cstate->connections = DataNodeCopyBegin(cstate->query_buf.data, + cstate->rel_loc->nodeList, + GetActiveSnapshot()); + if (!cstate->connections) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Failed to initialize data nodes for COPY"))); + } + PG_TRY(); + { +#endif + if (is_from) CopyFrom(cstate); /* copy from file to database */ else DoCopyTo(cstate); /* copy from database to file */ +#ifdef PGXC + } + PG_CATCH(); + { + if (IS_PGXC_COORDINATOR) + { + DataNodeCopyFinish( + cstate->connections, + primary_data_node, + COMBINE_TYPE_NONE, + whereToSendOutput); + pfree(cstate->connections); + pfree(cstate->query_buf.data); + FreeRelationLocInfo(cstate->rel_loc); + } + PG_RE_THROW(); + } + PG_END_TRY(); + if (IS_PGXC_COORDINATOR) + { + if (cstate->rel_loc->locatorType == LOCATOR_TYPE_REPLICATED) + cstate->processed = DataNodeCopyFinish( + cstate->connections, + primary_data_node, + COMBINE_TYPE_SAME, + whereToSendOutput); + else + cstate->processed = DataNodeCopyFinish( + cstate->connections, + 0, + COMBINE_TYPE_SUM, + whereToSendOutput); + pfree(cstate->connections); + pfree(cstate->query_buf.data); + FreeRelationLocInfo(cstate->rel_loc); + } +#endif + /* * Close the relation or query. If reading, we can release the * AccessShareLock we got; if writing, we should hold the lock until end @@ -1178,9 +1481,8 @@ DoCopy(const CopyStmt *stmt, const char *queryString) PopActiveSnapshot(); } - /* Clean up storage (probably not really necessary) */ processed = cstate->processed; - + /* Clean up storage (probably not really necessary) */ pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); @@ -2089,6 +2391,25 @@ CopyFrom(CopyState cstate) &nulls[defmap[i]], NULL); } +#ifdef PGXC + if (IS_PGXC_COORDINATOR) + { + Datum *hash_value = NULL; + + if (cstate->hash_idx >= 0 && !nulls[cstate->hash_idx]) + hash_value = &values[cstate->hash_idx]; + + if (DataNodeCopyIn(cstate->line_buf.data, + cstate->line_buf.len, + GetRelationNodes(cstate->rel_loc, (long *)hash_value, false), + cstate->connections)) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("Copy failed on a data node"))); + } + else + { +#endif /* And now we can form the input tuple. */ tuple = heap_form_tuple(tupDesc, values, nulls); @@ -2142,6 +2463,9 @@ CopyFrom(CopyState cstate) */ cstate->processed++; } +#ifdef PGXC + } +#endif } /* Done, clean up */ diff --git a/src/backend/pgxc/locator/locator.c b/src/backend/pgxc/locator/locator.c index 1ce8299fce..99ef5e6ee3 100644 --- a/src/backend/pgxc/locator/locator.c +++ b/src/backend/pgxc/locator/locator.c @@ -290,6 +290,7 @@ GetRelationNodes(RelationLocInfo * rel_loc_info, long *partValue, int isRead) return NULL; exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); + exec_nodes->baselocatortype = rel_loc_info->locatorType; switch (rel_loc_info->locatorType) { diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 7f04728655..0720b0fc3d 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -4,7 +4,7 @@ * * Functions for generating a PGXC style plan. * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation * * @@ -38,7 +38,7 @@ typedef struct } Literal_Comparison; /* - * This struct helps us detect special conditions to determine what nodes + * This struct helps us detect special conditions to determine what nodes * to execute on. */ typedef struct @@ -84,8 +84,8 @@ bool StrictStatementChecking = true; /* Forbid multi-node SELECT statements with an ORDER BY clause */ bool StrictSelectChecking = false; -/* - * Create a new join struct for tracking how relations are joined +/* + * Create a new join struct for tracking how relations are joined */ static PGXC_Join * new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) @@ -113,8 +113,8 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) } -/* - * Look up the join struct for a particular join +/* + * Look up the join struct for a particular join */ static PGXC_Join * find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) @@ -220,7 +220,7 @@ free_join_list() /* * get_numeric_constant - extract casted constant * - * Searches an expression to see if it is a Constant that is being cast + * Searches an expression to see if it is a Constant that is being cast * to numeric. Return a pointer to the Constant, or NULL. * We need this because of casting. */ @@ -297,7 +297,7 @@ get_plan_nodes_insert(Query * query) long *part_value_ptr = NULL; - + /* Looks complex (correlated?) - best to skip */ if (query->jointree != NULL && query->jointree->fromlist != NULL) @@ -426,8 +426,8 @@ examine_conditions(Special_Conditions * conditions, List *rtables, Node *expr_no else if (boolexpr->boolop == OR_EXPR) { /* - * look at OR's as work-around for reported issue. - * NOTE: THIS IS NOT CORRECT, BUT JUST DONE FOR THE PROTOTYPE. + * look at OR's as work-around for reported issue. + * NOTE: THIS IS NOT CORRECT, BUT JUST DONE FOR THE PROTOTYPE. * More rigorous * checking needs to be done. PGXCTODO: Add careful checking for * OR'ed conditions... @@ -832,7 +832,7 @@ get_plan_nodes(Query_Plan * query_plan, Query * query, bool isRead) exec_nodes = test_exec_nodes; else { - if ((exec_nodes && list_length(exec_nodes->nodelist) > 1) + if ((exec_nodes && list_length(exec_nodes->nodelist) > 1) || (test_exec_nodes && list_length(test_exec_nodes->nodelist) > 1)) /* there should only be one */ exec_nodes = NULL; @@ -872,7 +872,7 @@ get_plan_nodes(Query_Plan * query_plan, Query * query, bool isRead) } -/* +/* * get_plan_nodes - determine the nodes to execute the plan on * * return NULL if it is not safe to be done in a single step. @@ -904,6 +904,35 @@ get_plan_nodes_command(Query_Plan * query_plan, Query * query) /* + * get_plan_combine_type - determine combine type + * + * COMBINE_TYPE_SAME - for replicated updates + * COMBINE_TYPE_SUM - for hash and round robin updates + * COMBINE_TYPE_NONE - for operations where row_count is not applicable + * + * return NULL if it is not safe to be done in a single step. + */ +static CombineType +get_plan_combine_type(Query *query, char baselocatortype) +{ + + switch (query->commandType) + { + case CMD_INSERT: + case CMD_UPDATE: + case CMD_DELETE: + return baselocatortype == LOCATOR_TYPE_REPLICATED ? + COMBINE_TYPE_SAME : COMBINE_TYPE_SUM; + + default: + return COMBINE_TYPE_NONE; + } + /* quiet compiler warning */ + return COMBINE_TYPE_NONE; +} + + +/* * Get list of simple aggregates used. * For now we only allow MAX in the first column, and return a list of one. */ @@ -972,6 +1001,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1); strcpy(query_step->sql_statement, sql_statement); query_step->exec_nodes = NULL; + query_step->combine_type = COMBINE_TYPE_NONE; query_step->simple_aggregates = NULL; query_plan->query_step_list = lappend(NULL, query_step); @@ -991,6 +1021,9 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) query = (Query *) linitial(querytree_list); query_step->exec_nodes = get_plan_nodes_command(query_plan, query); + if (query_step->exec_nodes) + query_step->combine_type = get_plan_combine_type( + query, query_step->exec_nodes->baselocatortype); query_step->simple_aggregates = get_simple_aggregates(query, query_step->exec_nodes); @@ -1065,7 +1098,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("UNION, INTERSECT and EXCEPT are not yet supported")))); - if (StrictStatementChecking && query_step->exec_nodes + if (StrictStatementChecking && query_step->exec_nodes && list_length(query_step->exec_nodes->nodelist) > 1) { /* @@ -1264,7 +1297,7 @@ free_query_step(Query_Step * query_step) return; pfree(query_step->sql_statement); - if (query_step->exec_nodes) + if (query_step->exec_nodes) { if (query_step->exec_nodes->nodelist) list_free(query_step->exec_nodes->nodelist); diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c index ed063bc03b..ddb9482f97 100644 --- a/src/backend/pgxc/pool/combiner.c +++ b/src/backend/pgxc/pool/combiner.c @@ -48,6 +48,9 @@ CreateResponseCombiner(int node_count, CombineType combine_type, combiner->row_count = 0; combiner->request_type = REQUEST_TYPE_NOT_DEFINED; combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->inErrorState = false; combiner->simple_aggregates = NULL; return combiner; @@ -60,14 +63,22 @@ static int parse_row_count(const char *message, size_t len, int *rowcount) { int digits = 0; + int pos; *rowcount = 0; /* skip \0 string terminator */ - len--; - while (len-- > 0 && message[len] >= '0' && message[len] <= '9') + for (pos = 0; pos < len - 1; pos++) { - *rowcount = *rowcount * 10 + message[len] - '0'; - digits++; + if (message[pos] >= '0' && message[pos] <= '9') + { + *rowcount = *rowcount * 10 + message[pos] - '0'; + digits++; + } + else + { + *rowcount = 0; + digits = 0; + } } return digits; } @@ -149,8 +160,11 @@ process_aggregate_element(List *simple_aggregates, char *msg_body, size_t len) int CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len) { - int rowcount; - int digits = 0; + int digits = 0; + + /* Ignore anything if we have encountered error */ + if (combiner->inErrorState) + return EOF; switch (msg_type) { @@ -164,6 +178,7 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t /* Extract rowcount */ if (combiner->combine_type != COMBINE_TYPE_NONE) { + int rowcount; digits = parse_row_count(msg_body, len, &rowcount); if (digits > 0) { @@ -176,13 +191,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t /* There is a consistency issue in the database with the replicated table */ ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Write to replicated table returned different results from the data nodes" - ))); + errmsg("Write to replicated table returned different results from the data nodes"))); } else /* first result */ combiner->row_count = rowcount; - } else + } else combiner->row_count += rowcount; } else @@ -202,12 +216,9 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t { char command_complete_buffer[256]; - rowcount = combiner->combine_type == COMBINE_TYPE_SUM ? - combiner->row_count : - combiner->row_count / combiner->node_count; /* Truncate msg_body to get base string */ msg_body[len - digits - 1] = '\0'; - len = sprintf(command_complete_buffer, "%s%d", msg_body, rowcount) + 1; + len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1; pq_putmessage(msg_type, command_complete_buffer, len); } } @@ -219,7 +230,9 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t if (combiner->request_type != REQUEST_TYPE_QUERY) { /* Inconsistent responses */ - return EOF; + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); } /* Proxy first */ if (combiner->description_count++ == 0) @@ -235,10 +248,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t if (combiner->request_type != REQUEST_TYPE_COPY_IN) { /* Inconsistent responses */ - return EOF; + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); } /* Proxy first */ - if (combiner->description_count++ == 0) + if (combiner->copy_in_count++ == 0) { if (combiner->dest == DestRemote || combiner->dest == DestRemoteExecute) @@ -251,10 +266,12 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t if (combiner->request_type != REQUEST_TYPE_COPY_OUT) { /* Inconsistent responses */ - return EOF; + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); } /* Proxy first */ - if (combiner->description_count++ == 0) + if (combiner->copy_out_count++ == 0) { if (combiner->dest == DestRemote || combiner->dest == DestRemoteExecute) @@ -316,20 +333,23 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t } break; case 'E': /* ErrorResponse */ + combiner->inErrorState = true; + /* fallthru */ case 'A': /* NotificationResponse */ case 'N': /* NoticeResponse */ - /* Proxy error message back if specified, - * or if doing internal primary copy + /* Proxy error message back if specified, + * or if doing internal primary copy */ if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute - || combiner->combine_type == COMBINE_TYPE_SAME) + || combiner->dest == DestRemoteExecute) pq_putmessage(msg_type, msg_body, len); break; case 'I': /* EmptyQuery */ default: /* Unexpected message */ - return EOF; + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Unexpected response from the data nodes"))); } return 0; } @@ -341,13 +361,31 @@ CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t static bool validate_combiner(ResponseCombiner combiner) { + /* There was error message while combining */ + if (combiner->inErrorState) + return false; + /* Check if state is defined */ + if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) + return false; /* Check all nodes completed */ - if (combiner->command_complete_count != combiner->node_count) + if ((combiner->request_type == REQUEST_TYPE_COMMAND + || combiner->request_type == REQUEST_TYPE_QUERY) + && combiner->command_complete_count != combiner->node_count) return false; /* Check count of description responses */ - if (combiner->request_type != REQUEST_TYPE_COMMAND - && combiner->description_count != combiner->node_count) + if (combiner->request_type == REQUEST_TYPE_QUERY + && combiner->description_count != combiner->node_count) + return false; + + /* Check count of copy-in responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_IN + && combiner->copy_in_count != combiner->node_count) + return false; + + /* Check count of copy-out responses */ + if (combiner->request_type == REQUEST_TYPE_COPY_OUT + && combiner->copy_out_count != combiner->node_count) return false; /* Add other checks here as needed */ @@ -381,12 +419,25 @@ ValidateAndResetCombiner(ResponseCombiner combiner) combiner->row_count = 0; combiner->request_type = REQUEST_TYPE_NOT_DEFINED; combiner->description_count = 0; + combiner->copy_in_count = 0; + combiner->copy_out_count = 0; + combiner->inErrorState = false; combiner->simple_aggregates = NULL; return valid; } /* + * Close combiner and free allocated memory, if it is not needed + */ +void +CloseCombiner(ResponseCombiner combiner) +{ + if (combiner) + pfree(combiner); +} + +/* * Assign combiner aggregates */ void diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 6cc8514641..2b997a528b 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -28,8 +28,10 @@ #include "access/xact.h" #include "postgres.h" #include "utils/snapmgr.h" +#include "pgxc/pgxc.h" #include "gtm/gtm_c.h" #include "pgxc/datanode.h" +#include "pgxc/locator.h" #include "../interfaces/libpq/libpq-fe.h" #include "utils/elog.h" #include "utils/memutils.h" @@ -50,9 +52,9 @@ static void release_handles(void); static void data_node_init(DataNodeHandle * handle, int sock); static void data_node_free(DataNodeHandle * handle); -static int data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner); -static int data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner); +static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid); +static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest); +static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest); static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * handle); @@ -64,7 +66,7 @@ static int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); static void add_error_message(DataNodeHandle * handle, const char *message); static int data_node_read_data(DataNodeHandle * conn); -static int handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorState); +static int handle_response(DataNodeHandle * conn, ResponseCombiner combiner); static int get_int(DataNodeHandle * conn, size_t len, int *out); static int get_char(DataNodeHandle * conn, char *out); @@ -220,8 +222,8 @@ InitMultinodeExecutor() node_count = 0; } -/* - * Builds up a connection string +/* + * Builds up a connection string */ char * DataNodeConnStr(char *host, char *port, char *dbname, @@ -250,8 +252,8 @@ DataNodeConnStr(char *host, char *port, char *dbname, } -/* - * Connect to a Data Node using a connection string +/* + * Connect to a Data Node using a connection string */ NODE_CONNECTION * DataNodeConnect(char *connstr) @@ -264,8 +266,8 @@ DataNodeConnect(char *connstr) } -/* - * Close specified connection +/* + * Close specified connection */ void DataNodeClose(NODE_CONNECTION * conn) @@ -275,8 +277,8 @@ DataNodeClose(NODE_CONNECTION * conn) } -/* - * Checks if connection active +/* + * Checks if connection active */ int DataNodeConnected(NODE_CONNECTION * conn) @@ -327,17 +329,12 @@ data_node_init(DataNodeHandle * handle, int sock) /* - * Handle responses from the Data node connections + * Handle responses from the Data node connections */ -static int -data_node_receive_responses(int conn_count, DataNodeHandle ** connections, +static void +data_node_receive_responses(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout, ResponseCombiner combiner) { - int result = 0; - int retry_count; - bool timed_out = false; - bool inErrorState = false; - int count = conn_count; DataNodeHandle *to_receive[conn_count]; @@ -362,11 +359,8 @@ data_node_receive_responses(int conn_count, DataNodeHandle ** connections, { /* note if a connection has error */ if (!to_receive[i] - || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL - || to_receive[i]->sock >= 1024) + || to_receive[i]->state == DN_CONNECTION_STATE_ERROR_FATAL) { - result = EOF; - /* Handling is done, do not track this connection */ count--; @@ -391,7 +385,6 @@ data_node_receive_responses(int conn_count, DataNodeHandle ** connections, if (count == 0) break; - retry_count = 0; retry: res_select = select(nfds + 1, &readfds, NULL, NULL, timeout); if (res_select < 0) @@ -409,19 +402,18 @@ retry: ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("select() bad file descriptor set"))); - return EOF; } ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("select() error: %d", errno))); - return EOF; } if (res_select == 0) { /* Handle timeout */ - result = EOF; - timed_out = true; + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("timeout while waiting for response"))); } /* read data */ @@ -435,24 +427,16 @@ retry: if (read_status == EOF || read_status < 0) { - count--; - /* Move last connection in place */ - if (i < count) - { - to_receive[i] = to_receive[count]; - /* stay on the current position */ - i--; - } - - inErrorState = true; - result = EOF; - continue; + /* PGXCTODO - we should notify the pooler to destroy the connections */ + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on datanode connection"))); } } if (conn->inStart < conn->inEnd) { - if (handle_response(conn, combiner, inErrorState) == 0 + if (handle_response(conn, combiner) == 0 || conn->state == DN_CONNECTION_STATE_ERROR_READY || conn->state == DN_CONNECTION_STATE_ERROR_FATAL) { @@ -466,24 +450,9 @@ retry: i--; } } - - /* - * See if we flagged an error on connection. Note, if - * handle_response was not 0 above, an error occurred, we - * still need to consume the ReadyForQuery message - */ - if (conn->state == DN_CONNECTION_STATE_ERROR_READY - || conn->state == DN_CONNECTION_STATE_ERROR_NOT_READY - || conn->state == DN_CONNECTION_STATE_ERROR_FATAL) - { - inErrorState = true; - result = EOF; - } } } } - - return result; } /* @@ -675,7 +644,7 @@ get_int(DataNodeHandle * conn, size_t len, int *out) * and closing the connections. */ static int -handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorState) +handle_response(DataNodeHandle * conn, ResponseCombiner combiner) { char msg_type; int msg_len; @@ -708,32 +677,45 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorSt /* no need to parse, just move cursor */ conn->inCursor += msg_len; conn->state = DN_CONNECTION_STATE_COMPLETED; - if (!inErrorState) - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); break; case 'T': /* RowDescription */ - case 'G': /* CopyInResponse */ - case 'H': /* CopyOutResponse */ case 'D': /* DataRow */ /* no need to parse, just move cursor */ conn->inCursor += msg_len; - if (!inErrorState) - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); + break; + case 'G': /* CopyInResponse */ + /* no need to parse, just move cursor */ + conn->inCursor += msg_len; + conn->state = DN_CONNECTION_STATE_COPY_IN; + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); + /* Done, return to caller to let it know the data can be passed in */ + conn->inStart = conn->inCursor; + conn->state = DN_CONNECTION_STATE_COPY_IN; + return 0; + case 'H': /* CopyOutResponse */ + /* no need to parse, just move cursor */ + conn->inCursor += msg_len; + conn->state = DN_CONNECTION_STATE_COPY_OUT; + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); break; case 'E': /* ErrorResponse */ /* no need to parse, just move cursor */ conn->inCursor += msg_len; - if (!inErrorState) - CombineResponse(combiner, msg_type, - conn->inBuffer + conn->inStart + 5, - conn->inCursor - conn->inStart - 5); + CombineResponse(combiner, msg_type, + conn->inBuffer + conn->inStart + 5, + conn->inCursor - conn->inStart - 5); conn->inStart = conn->inCursor; - inErrorState = true; conn->state = DN_CONNECTION_STATE_ERROR_NOT_READY; /* * Do not return with an error, we still need to consume Z, @@ -756,31 +738,30 @@ handle_response(DataNodeHandle * conn, ResponseCombiner combiner, bool inErrorSt { conn->state = DN_CONNECTION_STATE_ERROR_READY; return EOF; - } else + } else conn->state = DN_CONNECTION_STATE_IDLE; return 0; case 'I': /* EmptyQuery */ default: /* sync lost? */ conn->state = DN_CONNECTION_STATE_ERROR_FATAL; - inErrorState = true; return EOF; } conn->inStart = conn->inCursor; - } + /* Keep compiler quiet */ return EOF; } - /* * Send BEGIN command to the Data nodes and receive responses */ static int -data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner, GlobalTransactionId gxid) +data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid) { int i; struct timeval *timeout = NULL; + ResponseCombiner combiner; /* Send BEGIN */ for (i = 0; i < conn_count; i++) @@ -792,16 +773,15 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, ResponseCombiner return EOF; } - /* Receive responses */ - if (data_node_receive_responses(conn_count, connections, timeout, combiner)) - return EOF; + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, dest); - /* Verify status? */ + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); - return 0; + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; } - /* Clears the write node list */ static void clear_write_node_list() @@ -835,8 +815,6 @@ DataNodeCommit(CommandDest dest) int res; int tran_count; DataNodeHandle *connections[node_count]; - ResponseCombiner combiner; - /* Quick check to make sure we have connections */ if (node_count == 0) goto finish; @@ -851,11 +829,7 @@ DataNodeCommit(CommandDest dest) if (tran_count == 0) goto finish; - combiner = CreateResponseCombiner(tran_count, - COMBINE_TYPE_NONE, dest); - res = data_node_commit(tran_count, connections, combiner); - if (!ValidateAndCloseCombiner(combiner) || res) - return EOF; + res = data_node_commit(tran_count, connections, dest); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -865,7 +839,7 @@ finish: release_handles(); autocommit = true; clear_write_node_list(); - return 0; + return res; } @@ -873,13 +847,14 @@ finish: * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses */ static int -data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner) +data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest) { int i; struct timeval *timeout = NULL; char buffer[256]; GlobalTransactionId gxid = InvalidGlobalTransactionId; int result = 0; + ResponseCombiner combiner = NULL; /* can set this to false to disable temporarily */ @@ -904,9 +879,7 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner * gxid for it. Hence GetCurrentGlobalTransactionId() just returns * already allocated gxid */ -/* #ifdef PGXC_COORD */ gxid = GetCurrentGlobalTransactionId(); -/* #endif */ sprintf(buffer, "PREPARE TRANSACTION 'T%d'", gxid); /* Send PREPARE */ @@ -916,20 +889,26 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner return EOF; } + combiner = CreateResponseCombiner(conn_count, + COMBINE_TYPE_NONE, dest); /* Receive responses */ - if (data_node_receive_responses(conn_count, connections, timeout, combiner)) - return EOF; + data_node_receive_responses(conn_count, connections, timeout, combiner); /* Reset combiner */ if (!ValidateAndResetCombiner(combiner)) - return EOF; + { + result = EOF; + } } if (!do2PC) strcpy(buffer, "COMMIT"); else { - sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); + if (result) + sprintf(buffer, "ROLLBACK PREPARED 'T%d'", gxid); + else + sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); /* We need to use a new xid, the data nodes have reset */ two_phase_xid = BeginTranGTM(); @@ -954,9 +933,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, ResponseCombiner } } + if (!combiner) + combiner = CreateResponseCombiner(conn_count, + COMBINE_TYPE_NONE, dest); /* Receive responses */ - if (data_node_receive_responses(conn_count, connections, timeout, combiner)) - result = EOF; + data_node_receive_responses(conn_count, connections, timeout, combiner); + result = ValidateAndCloseCombiner(combiner) ? result : EOF; finish: if (do2PC) @@ -975,8 +957,6 @@ DataNodeRollback(CommandDest dest) int res = 0; int tran_count; DataNodeHandle *connections[node_count]; - ResponseCombiner combiner; - int i; /* Quick check to make sure we have connections */ if (node_count == 0) @@ -992,19 +972,7 @@ DataNodeRollback(CommandDest dest) if (tran_count == 0) goto finish; - combiner = CreateResponseCombiner(tran_count, - COMBINE_TYPE_NONE, dest); - res = data_node_rollback(tran_count, connections, combiner); - - /* Assume connection got cleaned up. Reset so we can reuse without error. */ - for (i = 0; i < tran_count; i++) - { - connections[i]->transaction_status = 'I'; - connections[i]->state = DN_CONNECTION_STATE_IDLE; - } - - if (!ValidateAndCloseCombiner(combiner) || res) - res = EOF; + res = data_node_rollback(tran_count, connections, dest); finish: /* In autocommit mode statistics is collected in DataNodeExec */ @@ -1044,11 +1012,12 @@ release_handles(void) * Send ROLLBACK command down to the Data nodes and handle responses */ static int -data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombiner combiner) +data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest) { int i; struct timeval *timeout = NULL; int result = 0; + ResponseCombiner combiner; /* Send ROLLBACK - */ for (i = 0; i < conn_count; i++) @@ -1057,12 +1026,13 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombin result = EOF; } + combiner = CreateResponseCombiner(conn_count, + COMBINE_TYPE_NONE, dest); /* Receive responses */ - if (data_node_receive_responses(conn_count, connections, timeout, combiner)) - return EOF; + data_node_receive_responses(conn_count, connections, timeout, combiner); - /* Verify status? */ - return 0; + /* Verify status */ + return ValidateAndCloseCombiner(combiner) ? 0 : EOF; } @@ -1080,19 +1050,17 @@ data_node_rollback(int conn_count, DataNodeHandle ** connections, ResponseCombin * bool is_read_only - if this is a read-only query */ int -DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapshot snapshot, - bool force_autocommit, List *simple_aggregates, bool is_read_only) +DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, + CommandDest dest, Snapshot snapshot, bool force_autocommit, + List *simple_aggregates, bool is_read_only) { int i; int j; int regular_conn_count; int total_conn_count; struct timeval *timeout = NULL; /* wait forever */ - ResponseCombiner combiner; - ResponseCombiner primary_combiner = NULL; - int primary_row_count = 0; + ResponseCombiner combiner = NULL; int row_count = 0; - int res; int new_count = 0; bool need_tran; bool found; @@ -1103,8 +1071,6 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh List *nodelist = NIL; List *primarynode = NIL; /* add up affected row count by default, override for replicated writes */ - CombineType combine_type = COMBINE_TYPE_SUM; - if (exec_nodes) { @@ -1212,11 +1178,8 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh } if (new_count > 0 && need_tran) { - combiner = CreateResponseCombiner(new_count, COMBINE_TYPE_NONE, DestNone); - /* Start transaction on connections where it is not started */ - res = data_node_begin(new_count, new_connections, combiner, gxid); - if (!ValidateAndCloseCombiner(combiner) || res) + if (data_node_begin(new_count, new_connections, DestNone, gxid)) { pfree(connections); return EOF; @@ -1230,16 +1193,16 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh if (!need_tran && data_node_send_gxid(primaryconnection[0], gxid)) { add_error_message(primaryconnection[0], "Can not send request"); - if (connections) + if (connections) pfree(connections); - if (primaryconnection) + if (primaryconnection) pfree(primaryconnection); return EOF; } if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) { add_error_message(primaryconnection[0], "Can not send request"); - if (connections) + if (connections) pfree(connections); pfree(primaryconnection); return EOF; @@ -1247,22 +1210,31 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh if (data_node_send_query(primaryconnection[0], query) != 0) { add_error_message(primaryconnection[0], "Can not send request"); - if (connections) + if (connections) pfree(connections); - if (primaryconnection) + if (primaryconnection) pfree(primaryconnection); return EOF; } - /* Use DestNone so that a response will not be sent until run on secondary nodes */ - /* Use COMBINE_TYPE_SAME so that we get the affected row count */ - primary_combiner = CreateResponseCombiner(1, COMBINE_TYPE_SAME, DestNone); + Assert(combine_type == COMBINE_TYPE_SAME); + + /* + * Create combiner. + * Note that we use the same combiner later with the secondary nodes, + * so that we do not prematurely send a response to the client + * until all nodes have completed execution. + */ + combiner = CreateResponseCombiner(total_conn_count, combine_type, dest); + AssignCombinerAggregates(combiner, simple_aggregates); /* Receive responses */ - res = data_node_receive_responses(1, primaryconnection, timeout, primary_combiner); - primary_row_count = primary_combiner->row_count; - if (!ValidateAndCloseCombiner(primary_combiner) || res) + data_node_receive_responses(1, primaryconnection, timeout, combiner); + /* If we got an error response return immediately */ + if (DN_CONNECTION_STATE_ERROR(primaryconnection[0])) { + /* We are going to exit, so release combiner */ + CloseCombiner(combiner); if (autocommit) { if (need_tran) @@ -1271,18 +1243,12 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh release_handles(); } - if (primaryconnection) + if (primaryconnection) pfree(primaryconnection); - if (connections) + if (connections) pfree(connections); return EOF; } - /* - * If we get here, the statement has executed successfully on the primary data node - * and it is ok to try and execute on the other data nodes. - * Set combine_type for secondary so that we do not sum up row counts. - */ - combine_type = COMBINE_TYPE_SAME; } /* Send query to nodes */ @@ -1309,16 +1275,19 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh } } - combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest); - AssignCombinerAggregates(combiner, simple_aggregates); + /* We may already have combiner if it is replicated case with primary data node */ + if (!combiner) + { + combiner = CreateResponseCombiner(regular_conn_count, combine_type, dest); + AssignCombinerAggregates(combiner, simple_aggregates); + } /* Receive responses */ - res = data_node_receive_responses(regular_conn_count, connections, timeout, combiner); + data_node_receive_responses(regular_conn_count, connections, timeout, combiner); row_count = combiner->row_count; /* Check for errors and if primary nodeMake sure primary and secondary nodes were updated the same */ - if (!ValidateAndCloseCombiner(combiner) || res - || (primaryconnection && row_count != primary_row_count)) + if (!ValidateAndCloseCombiner(combiner)) { if (autocommit) { @@ -1330,12 +1299,6 @@ DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapsh pfree(connections); - if (primaryconnection && row_count != primary_row_count) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Write to replicated table returned different results between primary and secondary data nodes. Primary: %d, secondary: %d", primary_row_count, row_count - ))); - return EOF; } @@ -1558,6 +1521,24 @@ send_some(DataNodeHandle * handle, int len) /* + * This method won't return until connection buffer is empty or error occurs + * To ensure all data are on the wire before waiting for response + */ +static int +data_node_flush(DataNodeHandle *handle) +{ + while (handle->outEnd) + { + if (send_some(handle, handle->outEnd) < 0) + { + add_error_message(handle, "failed to send data to datanode"); + return EOF; + } + } + return 0; +} + +/* * Send specified statement down to the Data node */ static int @@ -1582,13 +1563,9 @@ data_node_send_query(DataNodeHandle * handle, const char *query) memcpy(handle->outBuffer + handle->outEnd, query, strLen); handle->outEnd += strLen; - /* We need response right away, so send immediately */ - if (send_some(handle, handle->outEnd) < 0) - return EOF; - handle->state = DN_CONNECTION_STATE_BUSY; - return 0; + return data_node_flush(handle); } @@ -1838,3 +1815,439 @@ DataNodeCleanAndRelease(int code, Datum arg) /* Dump collected statistics to the log */ stat_log(); } + +/* + * Begin COPY command + * The copy_connections array must have room for NumDataNodes items + */ +DataNodeHandle** +DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot) +{ + int i, j; + int conn_count = list_length(nodelist) == 0 ? NumDataNodes : list_length(nodelist); + struct timeval *timeout = NULL; + DataNodeHandle **connections; + DataNodeHandle **copy_connections; + DataNodeHandle *newConnections[conn_count]; + int new_count = 0; + ListCell *nodeitem; + bool need_tran; + GlobalTransactionId gxid; + ResponseCombiner combiner; + + if (conn_count == 0) + return NULL; + + /* Get needed datanode connections */ + connections = get_handles(nodelist); + if (!connections) + return NULL; + + need_tran = !autocommit || conn_count > 1; + + elog(DEBUG1, "autocommit = %s, conn_count = %d, need_tran = %s", autocommit ? "true" : "false", conn_count, need_tran ? "true" : "false"); + + /* + * We need to be able quickly find a connection handle for specified node number, + * So store connections in an array where index is node-1. + * Unused items in the array should be NULL + */ + copy_connections = (DataNodeHandle **) palloc0(NumDataNodes * sizeof(DataNodeHandle *)); + i = 0; + foreach(nodeitem, nodelist) + copy_connections[lfirst_int(nodeitem) - 1] = connections[i++]; + + /* Gather statistics */ + stat_statement(); + if (autocommit) + stat_transaction(conn_count); + + /* We normally clear for transactions, but if autocommit, clear here, too */ + if (autocommit) + { + clear_write_node_list(); + } + + /* Check status of connections */ + /* We want to track new "write" nodes, and new nodes in the current transaction + * whether or not they are write nodes. */ + if (write_node_count < NumDataNodes) + { + for (i = 0; i < conn_count; i++) + { + bool found = false; + for (j=0; j<write_node_count && !found; j++) + { + if (write_node_list[j] == connections[i]) + found = true; + } + if (!found) + { + /* Add to transaction wide-list */ + write_node_list[write_node_count++] = connections[i]; + /* Add to current statement list */ + newConnections[new_count++] = connections[i]; + } + } + // Check connection state is DN_CONNECTION_STATE_IDLE + } + + gxid = GetCurrentGlobalTransactionId(); + + /* elog(DEBUG1, "Current gxid = %d", gxid); */ + + if (!GlobalTransactionIdIsValid(gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (new_count > 0 && need_tran) + { + /* Start transaction on connections where it is not started */ + if (data_node_begin(new_count, newConnections, DestNone, gxid)) + { + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* Send query to nodes */ + for (i = 0; i < conn_count; i++) + { + /* If explicit transaction is needed gxid is already sent */ + if (!need_tran && data_node_send_gxid(connections[i], gxid)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (snapshot && data_node_send_snapshot(connections[i], snapshot)) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + if (data_node_send_query(connections[i], query) != 0) + { + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } + } + + /* + * We are expecting CopyIn response, but do not want to send it to client, + * caller should take care about this, because here we do not know if + * client runs console or file copy + */ + combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE, DestNone); + + /* Receive responses */ + data_node_receive_responses(conn_count, connections, timeout, combiner); + if (!ValidateAndCloseCombiner(combiner)) + { + if (autocommit) + { + if (need_tran) + DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE, DestNone); + else + if (!PersistentConnections) release_handles(); + } + + pfree(connections); + pfree(copy_connections); + return NULL; + } + + pfree(connections); + return copy_connections; +} + +/* + * Buffer size does not affect performance significantly, just do not allow + * connection buffer grows infinitely + */ +#define COPY_BUFFER_SIZE 8192 +#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 + +/* + * Send a data row to the specified nodes + */ +int +DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections) +{ + DataNodeHandle *primary_handle = NULL; + ListCell *nodeitem; + /* size + data row + \n */ + int msgLen = 4 + len + 1; + int nLen = htonl(msgLen); + + if (exec_nodes->primarynodelist) + { + primary_handle = copy_connections[lfirst_int(list_head(exec_nodes->primarynodelist)) - 1]; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = primary_handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if (bytes_needed > COPY_BUFFER_SIZE) + { + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(primary_handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(primary_handle, "failed to read data from data node"); + return EOF; + } + + if (primary_handle->inStart < primary_handle->inEnd) + { + ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); + handle_response(primary_handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(primary_handle)) + return EOF; + + if (send_some(primary_handle, primary_handle->outEnd) < 0) + { + add_error_message(primary_handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'd'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, data_row, len); + primary_handle->outEnd += len; + primary_handle->outBuffer[primary_handle->outEnd++] = '\n'; + } + else + { + add_error_message(primary_handle, "Invalid data node connection"); + return EOF; + } + } + + foreach(nodeitem, exec_nodes->nodelist) + { + DataNodeHandle *handle = copy_connections[lfirst_int(nodeitem) - 1]; + if (handle && handle->state == DN_CONNECTION_STATE_COPY_IN) + { + /* precalculate to speed up access */ + int bytes_needed = handle->outEnd + 1 + msgLen; + + /* flush buffer if it is almost full */ + if ((primary_handle && bytes_needed > PRIMARY_NODE_WRITEAHEAD) + || (!primary_handle && bytes_needed > COPY_BUFFER_SIZE)) + { + int to_send = handle->outEnd; + + /* First look if data node has sent a error message */ + int read_status = data_node_read_data(handle); + if (read_status == EOF || read_status < 0) + { + add_error_message(handle, "failed to read data from data node"); + return EOF; + } + + if (handle->inStart < handle->inEnd) + { + ResponseCombiner combiner = CreateResponseCombiner(1, COMBINE_TYPE_NONE, DestNone); + handle_response(handle, combiner); + if (!ValidateAndCloseCombiner(combiner)) + return EOF; + } + + if (DN_CONNECTION_STATE_ERROR(handle)) + return EOF; + + /* + * Allow primary node to write out data before others. + * If primary node was blocked it would not accept copy data. + * So buffer at least PRIMARY_NODE_WRITEAHEAD at the other nodes. + * If primary node is blocked and is buffering, other buffers will + * grow accordingly. + */ + if (primary_handle) + { + if (primary_handle->outEnd + PRIMARY_NODE_WRITEAHEAD < handle->outEnd) + to_send = handle->outEnd - primary_handle->outEnd - PRIMARY_NODE_WRITEAHEAD; + else + to_send = 0; + } + + /* + * Try to send down buffered data if we have + */ + if (to_send && send_some(handle, to_send) < 0) + { + add_error_message(handle, "failed to send data to data node"); + return EOF; + } + } + + if (ensure_out_buffer_capacity(bytes_needed, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'd'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + memcpy(handle->outBuffer + handle->outEnd, data_row, len); + handle->outEnd += len; + handle->outBuffer[handle->outEnd++] = '\n'; + } + else + { + add_error_message(handle, "Invalid data node connection"); + return EOF; + } + } + + return 0; +} + +/* + * Finish copy process on all connections + */ +uint64 +DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, + CombineType combine_type, CommandDest dest) +{ + int i; + int nLen = htonl(4); + ResponseCombiner combiner = NULL; + bool need_tran; + bool res = 0; + struct timeval *timeout = NULL; /* wait forever */ + DataNodeHandle *connections[NumDataNodes]; + DataNodeHandle *primary_handle = NULL; + int conn_count = 0; + + for (i = 0; i < NumDataNodes; i++) + { + DataNodeHandle *handle = copy_connections[i]; + + if (!handle) + continue; + + if (i == primary_data_node - 1) + primary_handle = handle; + else + connections[conn_count++] = handle; + } + + if (primary_handle) + { + if (primary_handle->state == DN_CONNECTION_STATE_COPY_IN || primary_handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(primary_handle->outEnd + 1 + 4, primary_handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + primary_handle->outBuffer[primary_handle->outEnd++] = 'c'; + memcpy(primary_handle->outBuffer + primary_handle->outEnd, &nLen, 4); + primary_handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(primary_handle) < 0) + { + res = EOF; + } + } + else + { + res = EOF; + } + + combiner = CreateResponseCombiner(conn_count + 1, combine_type, dest); + data_node_receive_responses(1, &primary_handle, timeout, combiner); + } + + for (i = 0; i < conn_count; i++) + { + DataNodeHandle *handle = connections[i]; + + if (handle->state == DN_CONNECTION_STATE_COPY_IN || handle->state == DN_CONNECTION_STATE_COPY_OUT) + { + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + 4, handle) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + } + + handle->outBuffer[handle->outEnd++] = 'c'; + memcpy(handle->outBuffer + handle->outEnd, &nLen, 4); + handle->outEnd += 4; + + /* We need response right away, so send immediately */ + if (data_node_flush(handle) < 0) + { + res = EOF; + } + } + else + { + res = EOF; + } + } + + need_tran = !autocommit || primary_handle || conn_count > 1; + + if (!combiner) + combiner = CreateResponseCombiner(conn_count, combine_type, dest); + data_node_receive_responses(conn_count, connections, timeout, combiner); + if (!ValidateAndCloseCombiner(combiner) || res) + { + if (autocommit) + { + if (need_tran) + DataNodeRollback(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + return 0; + } + + if (autocommit) + { + if (need_tran) + DataNodeCommit(DestNone); + else + if (!PersistentConnections) release_handles(); + } + + // Verify status? + return combiner->row_count; +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 427bc6b8c3..7bdd5b8d38 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -79,9 +79,10 @@ /* PGXC_COORD */ #include "pgxc/planner.h" #include "pgxc/datanode.h" +#include "commands/copy.h" /* PGXC_DATANODE */ -#include "access/transam.h" -#endif +#include "access/transam.h" +#endif extern int optind; extern char *optarg; @@ -204,10 +205,10 @@ static List * pgxc_execute_direct (Node *parsetree, List *querytree_list, Comman * ---------------------------------------------------------------- */ -/* - * Called when the backend is ending. +/* + * Called when the backend is ending. */ -static void +static void DataNodeShutdown (int code, Datum arg) { /* Close connection with GTM, if active */ @@ -433,7 +434,7 @@ SocketBackend(StringInfo inBuf) case 'g': case 's': break; -#endif +#endif default: @@ -898,14 +899,14 @@ exec_simple_query(const char *query_string) Portal portal; DestReceiver *receiver; int16 format; -#ifdef PGXC +#ifdef PGXC Query_Plan *query_plan; Query_Step *query_step; bool exec_on_coord; int data_node_error = 0; - /* + /* * By default we do not want data nodes to contact GTM directly, * it should get this information passed down to it. */ @@ -978,10 +979,22 @@ exec_simple_query(const char *query_string) else if (IsA(parsetree, ExecDirectStmt)) querytree_list = pgxc_execute_direct(parsetree, querytree_list, dest, snapshot_set, &exec_on_coord); - else + else if (IsA(parsetree, CopyStmt)) + { + CopyStmt *copy = (CopyStmt *) parsetree; + /* Snapshot is needed for the Copy */ + if (!snapshot_set) + { + PushActiveSnapshot(GetTransactionSnapshot()); + snapshot_set = true; + } + DoCopy(copy, query_string); + exec_on_coord = false; + } + else { query_plan = GetQueryPlan(parsetree, query_string, querytree_list); - + exec_on_coord = query_plan->exec_loc_type & EXEC_ON_COORD; } @@ -1003,7 +1016,7 @@ exec_simple_query(const char *query_string) /* If we got a cancel signal in analysis or planning, quit */ CHECK_FOR_INTERRUPTS(); -#ifdef PGXC +#ifdef PGXC /* PGXC_DATANODE */ /* Force getting Xid from GTM if not autovacuum, but a vacuum */ if (IS_PGXC_DATANODE && IsA(parsetree, VacuumStmt) && IsPostmasterEnvironment) @@ -1089,7 +1102,7 @@ exec_simple_query(const char *query_string) PortalDrop(portal, false); -#ifdef PGXC +#ifdef PGXC } /* PGXC_COORD */ @@ -1100,9 +1113,10 @@ exec_simple_query(const char *query_string) { query_step = linitial(query_plan->query_step_list); - data_node_error = DataNodeExec(query_step->sql_statement, - query_step->exec_nodes, - dest, + data_node_error = DataNodeExec(query_step->sql_statement, + query_step->exec_nodes, + query_step->combine_type, + dest, snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), query_plan->force_autocommit, query_step->simple_aggregates, @@ -1110,15 +1124,17 @@ exec_simple_query(const char *query_string) if (data_node_error) { - /* Error. If it ran on the coordinator (DDL), too, make sure we abort */ - if (exec_on_coord) - { - AbortCurrentTransaction(); - xact_started = false; - } + /* An error occurred, change status on Coordinator, too, + * even if no statements ran on it. + * We want to only allow COMMIT/ROLLBACK + */ + AbortCurrentTransaction(); + xact_started = false; + /* AT() clears active snapshot */ + snapshot_set = false; } } - + FreeQueryPlan(query_plan); } @@ -3111,7 +3127,7 @@ PostgresMain(int argc, char *argv[], const char *username) case 'C': isPGXCCoordinator = true; break; -#endif +#endif case 'D': if (secure) @@ -3239,7 +3255,7 @@ PostgresMain(int argc, char *argv[], const char *username) case 'X': isPGXCDataNode = true; break; -#endif +#endif case 'y': /* @@ -3749,9 +3765,9 @@ PostgresMain(int argc, char *argv[], const char *username) ReadyForQuery(whereToSendOutput); #ifdef PGXC - /* - * Helps us catch any problems where we did not send down a snapshot - * when it was expected. + /* + * Helps us catch any problems where we did not send down a snapshot + * when it was expected. */ if (IS_PGXC_DATANODE) UnsetGlobalSnapshotData(); @@ -4276,9 +4292,9 @@ pgxc_transaction_stmt (Node *parsetree) switch (stmt->kind) { case TRANS_STMT_BEGIN: - /* + /* * This does not yet send down a BEGIN, - * we do that "on demand" as data nodes are added + * we do that "on demand" as data nodes are added */ DataNodeBegin(); break; @@ -4315,17 +4331,18 @@ pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bo Assert(IS_PGXC_COORDINATOR); Assert(IsA(parsetree, ExecDirectStmt)); - + exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); foreach (node_cell, execdirect->nodes) { int node_int = intVal(lfirst(node_cell)); - exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int); + exec_nodes->nodelist = lappend_int(exec_nodes->nodelist, node_int); } if (exec_nodes->nodelist) if (DataNodeExec(execdirect->query, exec_nodes, + COMBINE_TYPE_SAME, dest, snapshot_set ? GetActiveSnapshot() : GetTransactionSnapshot(), FALSE, @@ -4337,24 +4354,24 @@ pgxc_execute_direct (Node *parsetree, List *querytree_list, CommandDest dest, bo { /* * Parse inner statement, like at the begiining of the function - * We do not have to release wrapper trees, the message context + * We do not have to release wrapper trees, the message context * will be deleted later - * Also, no need to switch context - current is already + * Also, no need to switch context - current is already * the MessageContext */ parsetree_list = pg_parse_query(execdirect->query); /* We do not want to log or display the inner command */ - /* - * we do not support complex commands (expanded to multiple + /* + * we do not support complex commands (expanded to multiple * parse trees) within EXEC DIRECT */ if (list_length(parsetree_list) != 1) { - ereport(ERROR, + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Can not execute %s with EXECUTE DIRECT", + errmsg("Can not execute %s with EXECUTE DIRECT", execdirect->query))); } diff --git a/src/include/pgxc/combiner.h b/src/include/pgxc/combiner.h index 2596e50677..7926cbd9ff 100644 --- a/src/include/pgxc/combiner.h +++ b/src/include/pgxc/combiner.h @@ -46,6 +46,9 @@ typedef struct int row_count; RequestType request_type; int description_count; + uint64 copy_in_count; + uint64 copy_out_count; + bool inErrorState; List *simple_aggregates; } ResponseCombinerData; @@ -59,5 +62,6 @@ extern int CombineResponse(ResponseCombiner combiner, char msg_type, extern bool ValidateAndCloseCombiner(ResponseCombiner combiner); extern bool ValidateAndResetCombiner(ResponseCombiner combiner); extern void AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates); +extern void CloseCombiner(ResponseCombiner combiner); #endif /* COMBINER_H */ diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index 165be530d9..4f75ba24af 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -33,10 +33,16 @@ typedef enum DN_CONNECTION_STATE_COMPLETED, DN_CONNECTION_STATE_ERROR_NOT_READY, /* error, but need ReadyForQuery message */ DN_CONNECTION_STATE_ERROR_READY, /* error and received ReadyForQuery */ - DN_CONNECTION_STATE_ERROR_FATAL /* fatal error */ - + DN_CONNECTION_STATE_ERROR_FATAL, /* fatal error */ + DN_CONNECTION_STATE_COPY_IN, + DN_CONNECTION_STATE_COPY_OUT } DNConnectionState; +#define DN_CONNECTION_STATE_ERROR(dnconn) \ + (dnconn)->state == DN_CONNECTION_STATE_ERROR_FATAL \ + || (dnconn)->state == DN_CONNECTION_STATE_ERROR_NOT_READY \ + || (dnconn)->state == DN_CONNECTION_STATE_ERROR_READY + struct data_node_handle { /* fd of the connection */ @@ -74,7 +80,11 @@ extern void DataNodeBegin(void); extern int DataNodeCommit(CommandDest dest); extern int DataNodeRollback(CommandDest dest); -extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only); +extern int DataNodeExec(const char *query, Exec_Nodes *exec_nodes, CombineType combine_type, CommandDest dest, Snapshot snapshot, bool force_autocommit, List *simple_aggregates, bool is_read_only); + +extern DataNodeHandle** DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot); +extern int DataNodeCopyIn(char *data_row, int len, Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections); +extern uint64 DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node, CombineType combine_type, CommandDest dest); extern int primary_data_node; #endif diff --git a/src/include/pgxc/locator.h b/src/include/pgxc/locator.h index 6fa32fb73b..1d5280c765 100644 --- a/src/include/pgxc/locator.h +++ b/src/include/pgxc/locator.h @@ -52,6 +52,7 @@ typedef struct { List *primarynodelist; List *nodelist; + char baselocatortype; } Exec_Nodes; diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index c738312ce1..04ec6fcc0e 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -16,6 +16,7 @@ #define PGXCPLANNER_H #include "pgxc/locator.h" +#include "pgxc/combiner.h" /* for Query_Plan.exec_loc_type can have these OR'ed*/ @@ -30,6 +31,7 @@ typedef struct { char *sql_statement; Exec_Nodes *exec_nodes; + CombineType combine_type; List *simple_aggregates; /* simple aggregate to combine on this * step */ } Query_Step; |
