Skip to content

Commit 4c24136

Browse files
bonnefoaCommitfest Bot
authored andcommitted
Add pipelining support in psql
With \bind, \parse, \bind_named and \close, it is possible to issue queries from psql using the extended protocol. However, it wasn't possible to send those queries using pipelining and the only way to test pipelined queries was through pgbench's tap tests. This patch adds additional psql meta-commands to support pipelining: \startpipeline, \endpipeline and \syncpipeline, mirroring the existing meta-commands in pgbench. Additional meta-commands allow to flush and read results of an ongoing pipeline: \flushrequest, \flush and \getresults \startpipeline starts a new pipeline. All extended queries will be queued until the end of the pipeline is reached. \endpipeline ends an ongoing pipeline. All queued commands will be sent to the server and all responses will be processed by the psql. \syncpipeline queues a synchronisation point without flushing the commands to the server \flush Call PQflush on psql's connection \flushrequest queues a flushrequest \getresults reads server's results. Unsent data are automatically pushed when \getresults is called Those meta-commands will allow to test pipeline behaviour using psql regression tests.
1 parent 1fd1bd8 commit 4c24136

File tree

11 files changed

+1640
-11
lines changed

11 files changed

+1640
-11
lines changed

doc/src/sgml/ref/psql-ref.sgml

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3674,6 +3674,76 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
36743674
</listitem>
36753675
</varlistentry>
36763676

3677+
<varlistentry id="app-psql-meta-command-pipeline">
3678+
<term><literal>\startpipeline</literal></term>
3679+
<term><literal>\syncpipeline</literal></term>
3680+
<term><literal>\endpipeline</literal></term>
3681+
<term><literal>\flushrequest</literal></term>
3682+
<term><literal>\flush</literal></term>
3683+
<term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
3684+
3685+
<listitem>
3686+
<para>
3687+
This group of commands implements pipelining of SQL statements.
3688+
A pipeline must begin with a <command>\startpipeline</command>
3689+
and end with an <command>\endpipeline</command>. In between there
3690+
may be any number of <command>\syncpipeline</command> commands,
3691+
which sends a <link linkend="protocol-flow-ext-query">sync message</link>
3692+
without ending the ongoing pipeline and flushing the send buffer.
3693+
In pipeline mode, statements are sent to the server without waiting
3694+
for the results of previous statements. See
3695+
<xref linkend="libpq-pipeline-mode"/> for more details.
3696+
</para>
3697+
3698+
<para>
3699+
Pipeline mode requires the use of extended query protocol. All queries need
3700+
to be sent using the meta-commands <literal>\bind</literal>,
3701+
<literal>\bind_named</literal>, <literal>\close</literal> or
3702+
<literal>\parse</literal>. While a pipeline is ongoing,
3703+
<literal>\g</literal> will append the current query buffer to the pipeline and
3704+
other meta-commands like <literal>\gx</literal> or <literal>\gdesc</literal>
3705+
are not allowed in pipeline mode.
3706+
</para>
3707+
3708+
<para>
3709+
<command>\flushrequest</command> appends a flush command to the pipeline,
3710+
allowing to read results with <command>\getresults</command> without issuing
3711+
a sync or ending the pipeline. <command>\getresults</command> will automatically
3712+
push unsent data to the server. <command>\flush</command> can be used to manually
3713+
push unsent data.
3714+
</para>
3715+
3716+
<para>
3717+
<command>\getresults</command> accepts an optional
3718+
<replaceable class="parameter">number_results</replaceable> parameter. If provided,
3719+
only the first <replaceable class="parameter">number_results</replaceable> pending
3720+
results will be read. If not provided or 0, all pending results are read. The
3721+
commands <literal>\bind</literal>, <literal>\bind_named</literal>,
3722+
<literal>\close</literal>, <literal>\parse</literal> and <literal>\syncpipeline</literal>
3723+
generate one result to get.
3724+
</para>
3725+
3726+
<para>
3727+
Example:
3728+
<programlisting>
3729+
\startpipeline
3730+
SELECT 1 \bind \g
3731+
SELECT $1 \parse stmt1
3732+
\bind_named stmt1 1 \g
3733+
SELECT pg_current_xact_id() \bind \g
3734+
\flushrequest
3735+
\getresults
3736+
\syncpipeline
3737+
SELECT pg_current_xact_id() \bind \g
3738+
\flushrequest
3739+
\getresults 2
3740+
\close stmt1
3741+
\endpipeline
3742+
</programlisting></para>
3743+
3744+
</listitem>
3745+
</varlistentry>
3746+
36773747

36783748
<varlistentry id="app-psql-meta-command-t-lc">
36793749
<term><literal>\t</literal></term>

src/bin/psql/command.c

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,12 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
9090
PQExpBuffer query_buf);
9191
static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
9292
PQExpBuffer query_buf);
93+
static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
9394
static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
9495
static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
9596
static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
97+
static backslashResult exec_command_flush(PsqlScanState scan_state, bool active_branch);
98+
static backslashResult exec_command_flushrequest(PsqlScanState scan_state, bool active_branch);
9699
static backslashResult exec_command_g(PsqlScanState scan_state, bool active_branch,
97100
const char *cmd);
98101
static backslashResult process_command_g_options(char *first_option,
@@ -103,6 +106,7 @@ static backslashResult exec_command_gdesc(PsqlScanState scan_state, bool active_
103106
static backslashResult exec_command_getenv(PsqlScanState scan_state, bool active_branch,
104107
const char *cmd);
105108
static backslashResult exec_command_gexec(PsqlScanState scan_state, bool active_branch);
109+
static backslashResult exec_command_getresults(PsqlScanState scan_state, bool active_branch);
106110
static backslashResult exec_command_gset(PsqlScanState scan_state, bool active_branch);
107111
static backslashResult exec_command_help(PsqlScanState scan_state, bool active_branch);
108112
static backslashResult exec_command_html(PsqlScanState scan_state, bool active_branch);
@@ -132,6 +136,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
132136
const char *cmd);
133137
static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
134138
const char *cmd, bool is_func);
139+
static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
140+
static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
135141
static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
136142
static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
137143
static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
@@ -351,18 +357,26 @@ exec_command(const char *cmd,
351357
status = exec_command_else(scan_state, cstack, query_buf);
352358
else if (strcmp(cmd, "endif") == 0)
353359
status = exec_command_endif(scan_state, cstack, query_buf);
360+
else if (strcmp(cmd, "endpipeline") == 0)
361+
status = exec_command_endpipeline(scan_state, active_branch);
354362
else if (strcmp(cmd, "encoding") == 0)
355363
status = exec_command_encoding(scan_state, active_branch);
356364
else if (strcmp(cmd, "errverbose") == 0)
357365
status = exec_command_errverbose(scan_state, active_branch);
358366
else if (strcmp(cmd, "f") == 0)
359367
status = exec_command_f(scan_state, active_branch);
368+
else if (strcmp(cmd, "flush") == 0)
369+
status = exec_command_flush(scan_state, active_branch);
370+
else if (strcmp(cmd, "flushrequest") == 0)
371+
status = exec_command_flushrequest(scan_state, active_branch);
360372
else if (strcmp(cmd, "g") == 0 || strcmp(cmd, "gx") == 0)
361373
status = exec_command_g(scan_state, active_branch, cmd);
362374
else if (strcmp(cmd, "gdesc") == 0)
363375
status = exec_command_gdesc(scan_state, active_branch);
364376
else if (strcmp(cmd, "getenv") == 0)
365377
status = exec_command_getenv(scan_state, active_branch, cmd);
378+
else if (strcmp(cmd, "getresults") == 0)
379+
status = exec_command_getresults(scan_state, active_branch);
366380
else if (strcmp(cmd, "gexec") == 0)
367381
status = exec_command_gexec(scan_state, active_branch);
368382
else if (strcmp(cmd, "gset") == 0)
@@ -411,6 +425,10 @@ exec_command(const char *cmd,
411425
status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
412426
else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
413427
status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
428+
else if (strcmp(cmd, "startpipeline") == 0)
429+
status = exec_command_startpipeline(scan_state, active_branch);
430+
else if (strcmp(cmd, "syncpipeline") == 0)
431+
status = exec_command_syncpipeline(scan_state, active_branch);
414432
else if (strcmp(cmd, "t") == 0)
415433
status = exec_command_t(scan_state, active_branch);
416434
else if (strcmp(cmd, "T") == 0)
@@ -1515,6 +1533,44 @@ exec_command_f(PsqlScanState scan_state, bool active_branch)
15151533
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
15161534
}
15171535

1536+
/*
1537+
* \flush -- call PQflush on the connection
1538+
*/
1539+
static backslashResult
1540+
exec_command_flush(PsqlScanState scan_state, bool active_branch)
1541+
{
1542+
backslashResult status = PSQL_CMD_SKIP_LINE;
1543+
1544+
if (active_branch)
1545+
{
1546+
pset.send_mode = PSQL_SEND_FLUSH;
1547+
status = PSQL_CMD_SEND;
1548+
}
1549+
else
1550+
ignore_slash_options(scan_state);
1551+
1552+
return status;
1553+
}
1554+
1555+
/*
1556+
* \flushrequest -- send a flush request to the server
1557+
*/
1558+
static backslashResult
1559+
exec_command_flushrequest(PsqlScanState scan_state, bool active_branch)
1560+
{
1561+
backslashResult status = PSQL_CMD_SKIP_LINE;
1562+
1563+
if (active_branch)
1564+
{
1565+
pset.send_mode = PSQL_SEND_FLUSH_REQUEST;
1566+
status = PSQL_CMD_SEND;
1567+
}
1568+
else
1569+
ignore_slash_options(scan_state);
1570+
1571+
return status;
1572+
}
1573+
15181574
/*
15191575
* \g [(pset-option[=pset-value] ...)] [filename/shell-command]
15201576
* \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
@@ -1550,6 +1606,13 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
15501606

15511607
if (status == PSQL_CMD_SKIP_LINE && active_branch)
15521608
{
1609+
if (strcmp(cmd, "gx") == 0 && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1610+
{
1611+
pg_log_error("\\gx not allowed in pipeline mode");
1612+
clean_extended_state();
1613+
return PSQL_CMD_ERROR;
1614+
}
1615+
15531616
if (!fname)
15541617
pset.gfname = NULL;
15551618
else
@@ -1703,6 +1766,42 @@ exec_command_getenv(PsqlScanState scan_state, bool active_branch,
17031766
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
17041767
}
17051768

1769+
/*
1770+
* \getresults -- read results
1771+
*/
1772+
static backslashResult
1773+
exec_command_getresults(PsqlScanState scan_state, bool active_branch)
1774+
{
1775+
backslashResult status = PSQL_CMD_SKIP_LINE;
1776+
1777+
if (active_branch)
1778+
{
1779+
char *opt;
1780+
int num_results;
1781+
1782+
pset.send_mode = PSQL_SEND_GET_RESULTS;
1783+
status = PSQL_CMD_SEND;
1784+
opt = psql_scan_slash_option(scan_state, OT_NORMAL, NULL, false);
1785+
1786+
pset.requested_results = 0;
1787+
if (opt != NULL)
1788+
{
1789+
num_results = atoi(opt);
1790+
if (num_results < 0)
1791+
{
1792+
pg_log_error("\\getresults: invalid number of requested results");
1793+
return PSQL_CMD_SKIP_LINE;
1794+
}
1795+
pset.requested_results = num_results;
1796+
}
1797+
}
1798+
else
1799+
ignore_slash_options(scan_state);
1800+
1801+
return status;
1802+
}
1803+
1804+
17061805
/*
17071806
* \gexec -- send query and execute each field of result
17081807
*/
@@ -1713,6 +1812,12 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
17131812

17141813
if (active_branch)
17151814
{
1815+
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1816+
{
1817+
pg_log_error("\\gexec not allowed in pipeline mode");
1818+
clean_extended_state();
1819+
return PSQL_CMD_ERROR;
1820+
}
17161821
pset.gexec_flag = true;
17171822
status = PSQL_CMD_SEND;
17181823
}
@@ -1733,6 +1838,13 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
17331838
char *prefix = psql_scan_slash_option(scan_state,
17341839
OT_NORMAL, NULL, false);
17351840

1841+
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
1842+
{
1843+
pg_log_error("\\gset not allowed in pipeline mode");
1844+
clean_extended_state();
1845+
return PSQL_CMD_ERROR;
1846+
}
1847+
17361848
if (prefix)
17371849
pset.gset_prefix = prefix;
17381850
else
@@ -2718,6 +2830,63 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
27182830
return status;
27192831
}
27202832

2833+
/*
2834+
* \startpipeline -- enter pipeline mode
2835+
*/
2836+
static backslashResult
2837+
exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
2838+
{
2839+
backslashResult status = PSQL_CMD_SKIP_LINE;
2840+
2841+
if (active_branch)
2842+
{
2843+
pset.send_mode = PSQL_SEND_START_PIPELINE_MODE;
2844+
status = PSQL_CMD_SEND;
2845+
}
2846+
else
2847+
ignore_slash_options(scan_state);
2848+
2849+
return status;
2850+
}
2851+
2852+
/*
2853+
* \syncpipeline -- send a sync message to an active pipeline
2854+
*/
2855+
static backslashResult
2856+
exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
2857+
{
2858+
backslashResult status = PSQL_CMD_SKIP_LINE;
2859+
2860+
if (active_branch)
2861+
{
2862+
pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
2863+
status = PSQL_CMD_SEND;
2864+
}
2865+
else
2866+
ignore_slash_options(scan_state);
2867+
2868+
return status;
2869+
}
2870+
2871+
/*
2872+
* \endpipeline -- end pipeline mode
2873+
*/
2874+
static backslashResult
2875+
exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
2876+
{
2877+
backslashResult status = PSQL_CMD_SKIP_LINE;
2878+
2879+
if (active_branch)
2880+
{
2881+
pset.send_mode = PSQL_SEND_END_PIPELINE_MODE;
2882+
status = PSQL_CMD_SEND;
2883+
}
2884+
else
2885+
ignore_slash_options(scan_state);
2886+
2887+
return status;
2888+
}
2889+
27212890
/*
27222891
* \t -- turn off table headers and row count
27232892
*/

0 commit comments

Comments
 (0)