<varlistentry id="pgbench-metacommand-pipeline">
<term><literal>\startpipeline</literal></term>
+ <term><literal>\syncpipeline</literal></term>
<term><literal>\endpipeline</literal></term>
<listitem>
<para>
- These commands delimit the start and end of a pipeline of SQL
- statements. In pipeline mode, statements are sent to the server
- without waiting for the results of previous statements. See
+ This group of commands implements pipelining of SQL statements.
+ A pipeline must begin with a <command>\startpipeline</command>
+ and end with an <command>\endpipeline</command>. In between there
+ may be any number of <command>\syncpipeline</command> commands,
+ which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+ without ending the ongoing pipeline and flushing the send buffer.
+ In pipeline mode, statements are sent to the server without waiting
+ for the results of previous statements. See
<xref linkend="libpq-pipeline-mode"/> for more details.
Pipeline mode requires the use of extended query protocol.
</para>
int use_file; /* index in sql_script for this client */
int command; /* command number in script */
+ int num_syncs; /* number of ongoing sync commands */
/* client variables */
Variables variables;
META_ELSE, /* \else */
META_ENDIF, /* \endif */
META_STARTPIPELINE, /* \startpipeline */
+ META_SYNCPIPELINE, /* \syncpipeline */
META_ENDPIPELINE, /* \endpipeline */
} MetaCommand;
mc = META_ASET;
else if (pg_strcasecmp(cmd, "startpipeline") == 0)
mc = META_STARTPIPELINE;
+ else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+ mc = META_SYNCPIPELINE;
else if (pg_strcasecmp(cmd, "endpipeline") == 0)
mc = META_ENDPIPELINE;
else
break;
case PGRES_PIPELINE_SYNC:
- pg_log_debug("client %d pipeline ending", st->id);
- if (PQexitPipelineMode(st->con) != 1)
+ pg_log_debug("client %d pipeline ending, ongoing syncs: %d",
+ st->id, st->num_syncs);
+ st->num_syncs--;
+ if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
PQerrorMessage(st->con));
break;
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_SYNCPIPELINE)
+ {
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ {
+ commandFailed(st, "syncpipeline", "not in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQsendPipelineSync(st->con) == 0)
+ {
+ commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+ return CSTATE_ABORTED;
+ }
+ st->num_syncs++;
+ }
else if (command->meta == META_ENDPIPELINE)
{
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
commandFailed(st, "endpipeline", "failed to send a pipeline sync");
return CSTATE_ABORTED;
}
+ st->num_syncs++;
/* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
/* collect pending results before getting out of pipeline mode */
return CSTATE_WAIT_RESULT;
}
else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
my_command->meta == META_STARTPIPELINE ||
- my_command->meta == META_ENDPIPELINE)
+ my_command->meta == META_ENDPIPELINE ||
+ my_command->meta == META_SYNCPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
}
});
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+ '-t 1 -n -M extended',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline with \syncpipeline',
+ {
+ '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+\syncpipeline
+select 2;
+\syncpipeline
+select 3;
+\endpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode
$node->pgbench(
'-t 1 -n -M prepared',
}
});
+# Try \startpipeline with \syncpipeline without \endpipeline
+$node->pgbench(
+ '-t 2 -n -M extended',
+ 2,
+ [],
+ [qr{end of script reached with pipeline open}],
+ 'error: call \startpipeline and \syncpipeline without \endpipeline',
+ {
+ '001_pgbench_pipeline_7' => q{
+-- startpipeline with \syncpipeline only
+\startpipeline
+\syncpipeline
+}
+ });
+
# Working \startpipeline in prepared query mode with serializable
$node->pgbench(
'-c4 -t 10 -n -M prepared',