pgbench: Add \syncpipeline
authorMichael Paquier <michael@paquier.xyz>
Wed, 24 Jan 2024 07:55:19 +0000 (16:55 +0900)
committerMichael Paquier <michael@paquier.xyz>
Wed, 24 Jan 2024 07:55:19 +0000 (16:55 +0900)
This change adds a new meta-command called \syncpipeline to pgbench,
able to send a sync message without flushing using the new libpq
function PQsendPipelineSync().

This meta-command is available within a block made of \startpipeline and
\endpipeline.

Author: Anthonin Bonnefoy
Discussion: https://postgr.es/m/CAO6_XqpcNhW6LZHLF-2NpPzdTbyMm4-RVkr3+AP5cOKSm9hrWA@mail.gmail.com

doc/src/sgml/ref/pgbench.sgml
src/bin/pgbench/pgbench.c
src/bin/pgbench/t/001_pgbench_with_server.pl

index 05d3f81619f16ba1a3de64124fcc723267a3b51a..279bb0ad7df4113f4e81ed315b5f67d79984c1c6 100644 (file)
@@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 
    <varlistentry id="pgbench-metacommand-pipeline">
     <term><literal>\startpipeline</literal></term>
+    <term><literal>\syncpipeline</literal></term>
     <term><literal>\endpipeline</literal></term>
 
     <listitem>
       <para>
-        These commands delimit the start and end of a pipeline of SQL
-        statements.  In pipeline mode, statements are sent to the server
-        without waiting for the results of previous statements.  See
+        This group of commands implements pipelining of SQL statements.
+        A pipeline must begin with a <command>\startpipeline</command>
+        and end with an <command>\endpipeline</command>. In between there
+        may be any number of <command>\syncpipeline</command> commands,
+        which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+        without ending the ongoing pipeline and flushing the send buffer.
+        In pipeline mode, statements are sent to the server without waiting
+        for the results of previous statements.  See
         <xref linkend="libpq-pipeline-mode"/> for more details.
         Pipeline mode requires the use of extended query protocol.
      </para>
index 7b53f9c24da30ba99657cb6eb3fb2796f74fbaa7..af1f75257ff0a72f56cdef4bb2b798188c6ed058 100644 (file)
@@ -608,6 +608,7 @@ typedef struct
 
    int         use_file;       /* index in sql_script for this client */
    int         command;        /* command number in script */
+   int         num_syncs;      /* number of ongoing sync commands */
 
    /* client variables */
    Variables   variables;
@@ -697,6 +698,7 @@ typedef enum MetaCommand
    META_ELSE,                  /* \else */
    META_ENDIF,                 /* \endif */
    META_STARTPIPELINE,         /* \startpipeline */
+   META_SYNCPIPELINE,          /* \syncpipeline */
    META_ENDPIPELINE,           /* \endpipeline */
 } MetaCommand;
 
@@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
        mc = META_ASET;
    else if (pg_strcasecmp(cmd, "startpipeline") == 0)
        mc = META_STARTPIPELINE;
+   else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+       mc = META_SYNCPIPELINE;
    else if (pg_strcasecmp(cmd, "endpipeline") == 0)
        mc = META_ENDPIPELINE;
    else
@@ -3317,8 +3321,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
                break;
 
            case PGRES_PIPELINE_SYNC:
-               pg_log_debug("client %d pipeline ending", st->id);
-               if (PQexitPipelineMode(st->con) != 1)
+               pg_log_debug("client %d pipeline ending, ongoing syncs: %d",
+                            st->id, st->num_syncs);
+               st->num_syncs--;
+               if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
                    pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
                                 PQerrorMessage(st->con));
                break;
@@ -4449,6 +4455,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
            return CSTATE_ABORTED;
        }
    }
+   else if (command->meta == META_SYNCPIPELINE)
+   {
+       if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+       {
+           commandFailed(st, "syncpipeline", "not in pipeline mode");
+           return CSTATE_ABORTED;
+       }
+       if (PQsendPipelineSync(st->con) == 0)
+       {
+           commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+           return CSTATE_ABORTED;
+       }
+       st->num_syncs++;
+   }
    else if (command->meta == META_ENDPIPELINE)
    {
        if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@@ -4461,6 +4481,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
            commandFailed(st, "endpipeline", "failed to send a pipeline sync");
            return CSTATE_ABORTED;
        }
+       st->num_syncs++;
        /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
        /* collect pending results before getting out of pipeline mode */
        return CSTATE_WAIT_RESULT;
@@ -5794,7 +5815,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
    }
    else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
             my_command->meta == META_STARTPIPELINE ||
-            my_command->meta == META_ENDPIPELINE)
+            my_command->meta == META_ENDPIPELINE ||
+            my_command->meta == META_SYNCPIPELINE)
    {
        if (my_command->argc != 1)
            syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
index f60f7e89c1fbae38101c40410334c82e3cf3a201..5d2341a203531a08fbe3e65cb94273adf09e7599 100644 (file)
@@ -814,6 +814,27 @@ $node->pgbench(
 }
    });
 
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+   '-t 1 -n -M extended',
+   0,
+   [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+   [],
+   'working \startpipeline with \syncpipeline',
+   {
+       '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+\syncpipeline
+select 2;
+\syncpipeline
+select 3;
+\endpipeline
+}
+   });
+
 # Working \startpipeline in prepared query mode
 $node->pgbench(
    '-t 1 -n -M prepared',
@@ -904,6 +925,21 @@ $node->pgbench(
 }
    });
 
+# Try \startpipeline with \syncpipeline without \endpipeline
+$node->pgbench(
+   '-t 2 -n -M extended',
+   2,
+   [],
+   [qr{end of script reached with pipeline open}],
+   'error: call \startpipeline and \syncpipeline without \endpipeline',
+   {
+       '001_pgbench_pipeline_7' => q{
+-- startpipeline with \syncpipeline only
+\startpipeline
+\syncpipeline
+}
+   });
+
 # Working \startpipeline in prepared query mode with serializable
 $node->pgbench(
    '-c4 -t 10 -n -M prepared',