summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/copy.c13
-rw-r--r--src/backend/pgxc/locator/redistrib.c4
-rw-r--r--src/backend/pgxc/pool/execRemote.c14
-rw-r--r--src/include/pgxc/execRemote.h3
4 files changed, 17 insertions, 17 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 8ac52da4bb..bf431d77e0 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -687,13 +687,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
break;
}
}
-#ifdef PGXC
- /* A PGXC Datanode does not need to read the header data received from Coordinator */
- if (IS_PGXC_DATANODE &&
- cstate->binary &&
- cstate->fe_msgbuf->data[cstate->fe_msgbuf->len-1] == '\n')
- cstate->fe_msgbuf->len--;
-#endif
avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
if (avail > maxread)
avail = maxread;
@@ -2595,7 +2588,8 @@ CopyFrom(CopyState cstate)
if (DataNodeCopyIn(cstate->line_buf.data,
cstate->line_buf.len,
GET_NODES(rcstate->locator, value, isnull, NULL),
- (PGXCNodeHandle**) getLocatorResults(rcstate->locator)))
+ (PGXCNodeHandle**) getLocatorResults(rcstate->locator),
+ cstate->binary))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
@@ -2717,7 +2711,8 @@ CopyFrom(CopyState cstate)
if (DataNodeCopyIn(cstate->line_buf.data,
cstate->line_buf.len,
getLocatorNodeCount(rcstate->locator),
- (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator)))
+ (PGXCNodeHandle **) getLocatorNodeMap(rcstate->locator),
+ cstate->binary))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
diff --git a/src/backend/pgxc/locator/redistrib.c b/src/backend/pgxc/locator/redistrib.c
index eecca4781e..6999dc7e2a 100644
--- a/src/backend/pgxc/locator/redistrib.c
+++ b/src/backend/pgxc/locator/redistrib.c
@@ -573,7 +573,9 @@ distrib_copy_from(RedistribState *distribState, ExecNodes *exec_nodes)
if (DataNodeCopyIn(data, len,
GET_NODES(copyState->locator, value, is_null, NULL),
- (PGXCNodeHandle**) getLocatorResults(copyState->locator)))
+ (PGXCNodeHandle**)
+ getLocatorResults(copyState->locator),
+ false))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("Copy failed on a data node")));
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index d6e842301e..02bfb6c0d3 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -2862,10 +2862,12 @@ DataNodeCopyBegin(RemoteCopyData *rcstate)
* Send a data row to the specified nodes
*/
int
-DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_connections)
+DataNodeCopyIn(char *data_row, int len,
+ int conn_count, PGXCNodeHandle** copy_connections,
+ bool binary)
{
- /* size + data row + \n */
- int msgLen = 4 + len + 1;
+ /* size + data row + \n in CSV mode */
+ int msgLen = 4 + len + (binary ? 0 : 1);
int nLen = htonl(msgLen);
int i;
@@ -2928,7 +2930,8 @@ DataNodeCopyIn(char *data_row, int len, int conn_count, PGXCNodeHandle** copy_co
handle->outEnd += 4;
memcpy(handle->outBuffer + handle->outEnd, data_row, len);
handle->outEnd += len;
- handle->outBuffer[handle->outEnd++] = '\n';
+ if (!binary)
+ handle->outBuffer[handle->outEnd++] = '\n';
handle->in_extended_query = false;
}
@@ -3699,7 +3702,7 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count,
PGXCNodeHandle** connections)
{
int i;
- int msgLen = 4 + len + 1;
+ int msgLen = 4 + len;
int nLen = htonl(msgLen);
for (i = 0; i < conn_count; i++)
@@ -3720,7 +3723,6 @@ DataNodeCopyInBinaryForAll(char *msg_buf, int len, int conn_count,
handle->outEnd += 4;
memcpy(handle->outBuffer + handle->outEnd, msg_buf, len);
handle->outEnd += len;
- handle->outBuffer[handle->outEnd++] = '\n';
}
else
{
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index eda1ac4e99..45ad90738d 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -228,7 +228,8 @@ typedef void (*xact_callback) (bool isCommit, void *args);
/* Copy command just involves Datanodes */
extern void DataNodeCopyBegin(RemoteCopyData *rcstate);
extern int DataNodeCopyIn(char *data_row, int len, int conn_count,
- PGXCNodeHandle** copy_connections);
+ PGXCNodeHandle** copy_connections,
+ bool binary);
extern uint64 DataNodeCopyOut(PGXCNodeHandle** copy_connections,
int conn_count, FILE* copy_file);
extern uint64 DataNodeCopyStore(PGXCNodeHandle** copy_connections,