diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/copy.c | 13 | ||||
| -rw-r--r-- | src/backend/pgxc/locator/redistrib.c | 4 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 14 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 3 |
4 files changed, 17 insertions, 17 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 8ac52da4bb..bf431d77e0 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -687,13 +687,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) break; } } -#ifdef PGXC - /* A PGXC Datanode does not need to read the header data received from Coordinator */ - if (IS_PGXC_DATANODE && - cstate->binary && - cstate->fe_msgbuf->data[cstate->fe_msgbuf->len-1] == '\n') - cstate->fe_msgbuf->len--; -#endif avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; if (avail > maxread) avail = maxread; @@ -2595,7 +2588,8 @@ CopyFrom(CopyState cstate) if (DataNodeCopyIn(cstate->line_buf.data, cstate->line_buf.len, GET_NODES(rcstate->locator, value, isnull, NULL), - (PGXCNodeHandle**) getLocatorResults(rcstate->locator))) + (PGXCNodeHandle**) getLocatorResults(rcstate->locator), + cstate->binary)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Copy failed on a data node"))); @@ -2717,7 +2711,8 @@ CopyFrom(CopyState cstate) if (DataNodeCopyIn(cstate->line_buf.data, cstate->line_buf.len, getLocatorNodeCount(rcstate->locator), - (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator))) + (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator), + cstate->binary)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Copy failed on a data node"))); diff --git a/src/backend/pgxc/locator/redistrib.c b/src/backend/pgxc/locator/redistrib.c index eecca4781e..6999dc7e2a 100644 --- a/src/backend/pgxc/locator/redistrib.c +++ b/src/backend/pgxc/locator/redistrib.c @@ -573,7 +573,9 @@ distrib_copy_from(RedistribState *distribState, ExecNodes *exec_nodes) if (DataNodeCopyIn(data, len, GET_NODES(copyState->locator, value, is_null, NULL), - (PGXCNodeHandle**) getLocatorResults(copyState->locator))) + (PGXCNodeHandle**) + getLocatorResults(copyState->locator), + false)) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("Copy failed on a data node"))); diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index d6e842301e..02bfb6c0d3 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -2862,10 +2862,12 @@ DataNodeCopyBegin(RemoteCopyData *rcstate) * Send a data row to the specified nodes */ int -DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_connections) +DataNodeCopyIn(char *data_row, int len, + int conn_count, PGXCNodeHandle** copy_connections, + bool binary) { - /* size + data row + \n */ - int msgLen = 4 + len + 1; + /* size + data row + \n in CSV mode */ + int msgLen = 4 + len + (binary ? 0 : 1); int nLen = htonl(msgLen); int i; @@ -2928,7 +2930,8 @@ DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_co handle->outEnd += 4; memcpy(handle->outBuffer + handle->outEnd, data_row, len); handle->outEnd += len; - handle->outBuffer[handle->outEnd++] = '\n'; + if (!binary) + handle->outBuffer[handle->outEnd++] = '\n'; handle->in_extended_query = false; } @@ -3699,7 +3702,7 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count, PGXCNodeHandle** connections) { int i; - int msgLen = 4 + len + 1; + int msgLen = 4 + len; int nLen = htonl(msgLen); for (i = 0; i < conn_count; i++) @@ -3720,7 +3723,6 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count, handle->outEnd += 4; memcpy(handle->outBuffer + handle->outEnd, msg_buf, len); handle->outEnd += len; - handle->outBuffer[handle->outEnd++] = '\n'; } else { diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index eda1ac4e99..45ad90738d 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -228,7 +228,8 @@ typedef void (*xact_callback) (bool isCommit, void *args); /* Copy command just involves Datanodes */ extern void DataNodeCopyBegin(RemoteCopyData *rcstate); extern int DataNodeCopyIn(char *data_row, int len, int conn_count, - PGXCNodeHandle** copy_connections); + PGXCNodeHandle** copy_connections, + bool binary); extern uint64 DataNodeCopyOut(PGXCNodeHandle** copy_connections, int conn_count, FILE* copy_file); extern uint64 DataNodeCopyStore(PGXCNodeHandle** copy_connections, |
