* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
- if (conn->asyncStatus == PGASYNC_IDLE)
+ if (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
pqPipelineProcessQueue(conn);
break;
}
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
+ PGcmdQueueEntry *entry2 = NULL;
if (!PQsendQueryStart(conn, newQuery))
return 0;
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2 = pqAllocCmdQueueEntry(conn);
+ if (entry2 == NULL)
+ goto sendFailed;
+ }
/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
+
+ /*
+ * When pipeline mode is in use, we need a second entry in the command
+ * queue to represent Close Portal message. This allows us later to wait
+ * for the CloseComplete message to be received before getting in IDLE
+ * state.
+ */
+ if (conn->pipelineStatus != PQ_PIPELINE_OFF)
+ {
+ entry2->queryclass = PGQUERY_CLOSE;
+ entry2->query = NULL;
+ pqAppendCmdQueueEntry(conn, entry2);
+ }
+
return 1;
sendFailed:
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;
+
case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
- if (conn->pipelineStatus != PQ_PIPELINE_OFF)
- {
- /*
- * We're about to return the NULL that terminates the round of
- * results from the current query; prepare to send the results
- * of the next query when we're called next.
- */
- pqPipelineProcessQueue(conn);
- }
break;
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
+
+ /*
+ * We're about to return the NULL that terminates the round of
+ * results from the current query; prepare to send the results
+ * of the next query, if any, when we're called next. If there's
+ * no next element in the command queue, this gets us in IDLE
+ * state.
+ */
+ pqPipelineProcessQueue(conn);
+ res = NULL; /* query is complete */
+ break;
+
case PGASYNC_READY:
/*
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
- conn->asyncStatus = PGASYNC_IDLE;
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
/*
* ... in cases when we're sending a pipeline-sync result,
break;
}
+ /* If the next command we expect is CLOSE, read and consume it */
+ if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
+ conn->cmd_queue_head &&
+ conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
+ {
+ if (res && res->resultStatus != PGRES_FATAL_ERROR)
+ {
+ conn->asyncStatus = PGASYNC_BUSY;
+ parseInput(conn);
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ }
+ else
+ /* we won't ever see the Close */
+ pqCommandQueueAdvance(conn);
+ }
+
/* Time to fire PGEVT_RESULTCREATE events, if there are any */
if (res && res->nEvents > 0)
(void) PQfireResultCreateEvents(conn, res);
if (!conn)
return 0;
- if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
+ (conn->asyncStatus == PGASYNC_IDLE ||
+ conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
+ conn->cmd_queue_head == NULL)
return 1;
switch (conn->asyncStatus)
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;
- default:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK */
break;
+
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ appendPQExpBufferStr(&conn->errorMessage,
+ libpq_gettext("cannot exit pipeline mode while in COPY\n"));
}
/* still work to process */
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;
+ /* If the queue is now empty, reset the tail too */
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_tail = NULL;
+
/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;
+
case PGASYNC_IDLE:
+ /*
+ * If we're in IDLE mode and there's some command in the queue,
+ * get us into PIPELINE_IDLE mode and process normally. Otherwise
+ * there's nothing for us to do.
+ */
+ if (conn->cmd_queue_head != NULL)
+ {
+ conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
+ break;
+ }
+ return;
+
+ case PGASYNC_PIPELINE_IDLE:
+ Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
/* next query please */
break;
}
- /* Nothing to do if not in pipeline mode, or queue is empty */
- if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
- conn->cmd_queue_head == NULL)
+ /*
+ * If there are no further commands to process in the queue, get us in
+ * "real idle" mode now.
+ */
+ if (conn->cmd_queue_head == NULL)
+ {
+ conn->asyncStatus = PGASYNC_IDLE;
return;
+ }
/*
* Reset the error state. This and the next couple of steps correspond to
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}
if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
pg_fatal("exiting pipeline mode didn't seem to work");
- fprintf(stderr, "ok\n");
-
/*-
* Since we fired the pipelines off without a surrounding xact, the results
* should be:
}
PQclear(res);
+
+ fprintf(stderr, "ok\n");
}
/* State machine enum for test_pipelined_insert */
fprintf(stderr, "ok\n");
}
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+ int *n_notices = (int *) arg;
+
+ (*n_notices)++;
+ fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+ PGresult *res;
+ int n_notices = 0;
+
+ fprintf(stderr, "\npipeline idle...\n");
+
+ PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+ /*
+ * Cause a Close message to be sent to the server, and watch libpq's
+ * reaction to the resulting CloseComplete. libpq must not get in IDLE
+ * state until that has been received.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+ res = NULL;
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * Must not have got any notices here; note bug as described in
+ * https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
+ */
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 1\n");
+
+ /*
+ * Verify that we can send a query using simple query protocol after one
+ * in pipeline mode.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("got unexpected non-null result");
+ /* We can exit pipeline mode now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+ res = PQexec(conn, "SELECT 2");
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ if (res == NULL)
+ pg_fatal("PQexec returned NULL");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from non-pipeline query",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 2\n");
+
+ /*
+ * Case 2: exiting pipeline mode is not OK if a second command is sent.
+ */
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ if (PQsendQuery(conn, "SELECT 2") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ /* read terminating null from first query */
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ /* Try to exit pipeline mode in pipeline-idle state */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT 1") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQsendQuery(conn, "SELECT 2") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQexitPipelineMode(conn) == 1)
+ pg_fatal("exiting pipeline succeeded when it shouldn't");
+ if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+ strlen("cannot exit pipeline mode")) != 0)
+ pg_fatal("did not get expected error; got: %s",
+ PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from second pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 3\n");
+
+ /* Have a WARNING in the middle of a resultset */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
+ if (PQsendQuery(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)") != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL result received");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
+ fprintf(stderr, "ok - 4\n");
+}
+
static void
test_simple_pipeline(PGconn *conn)
{
if (PQexitPipelineMode(conn) != 1)
pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
}
/*
printf("multi_pipelines\n");
printf("nosync\n");
printf("pipeline_abort\n");
+ printf("pipeline_idle\n");
printf("pipelined_insert\n");
printf("prepared\n");
printf("simple_pipeline\n");
/* Set the trace file, if requested */
if (tracefile != NULL)
{
- trace = fopen(tracefile, "w");
+ if (strcmp(tracefile, "-") == 0)
+ trace = stdout;
+ else
+ trace = fopen(tracefile, "w");
if (trace == NULL)
pg_fatal("could not open file \"%s\": %m", tracefile);
test_nosync(conn);
else if (strcmp(testname, "pipeline_abort") == 0)
test_pipeline_abort(conn);
+ else if (strcmp(testname, "pipeline_idle") == 0)
+ test_pipeline_idle(conn);
else if (strcmp(testname, "pipelined_insert") == 0)
test_pipelined_insert(conn, numrows);
else if (strcmp(testname, "prepared") == 0)