pgbench: Add \syncpipeline
authorMichael Paquier <michael@paquier.xyz>
Wed, 24 Jan 2024 07:55:19 +0000 (16:55 +0900)
committerMichael Paquier <michael@paquier.xyz>
Wed, 24 Jan 2024 07:55:19 +0000 (16:55 +0900)
This change adds a new meta-command called \syncpipeline to pgbench,
able to send a sync message without flushing using the new libpq
function PQsendPipelineSync().

This meta-command is available within a block made of \startpipeline and
\endpipeline.

Author: Anthonin Bonnefoy
Discussion: https://postgr.es/m/CAO6_XqpcNhW6LZHLF-2NpPzdTbyMm4-RVkr3+AP5cOKSm9hrWA@mail.gmail.com

doc/src/sgml/ref/pgbench.sgml
src/bin/pgbench/pgbench.c
src/bin/pgbench/t/001_pgbench_with_server.pl

index 05d3f81619f16ba1a3de64124fcc723267a3b51a..279bb0ad7df4113f4e81ed315b5f67d79984c1c6 100644 (file)
@@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 
    <varlistentry id="pgbench-metacommand-pipeline">
     <term><literal>\startpipeline</literal></term>
+    <term><literal>\syncpipeline</literal></term>
     <term><literal>\endpipeline</literal></term>
 
     <listitem>
       <para>
-        These commands delimit the start and end of a pipeline of SQL
-        statements.  In pipeline mode, statements are sent to the server
-        without waiting for the results of previous statements.  See
+        This group of commands implements pipelining of SQL statements.
+        A pipeline must begin with a <command>\startpipeline</command>
+        and end with an <command>\endpipeline</command>. In between there
+        may be any number of <command>\syncpipeline</command> commands,
+        which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+        without ending the ongoing pipeline and flushing the send buffer.
+        In pipeline mode, statements are sent to the server without waiting
+        for the results of previous statements.  See
         <xref linkend="libpq-pipeline-mode"/> for more details.
         Pipeline mode requires the use of extended query protocol.
      </para>
index 7b53f9c24da30ba99657cb6eb3fb2796f74fbaa7..af1f75257ff0a72f56cdef4bb2b798188c6ed058 100644 (file)
@@ -608,6 +608,7 @@ typedef struct
 
        int                     use_file;               /* index in sql_script for this client */
        int                     command;                /* command number in script */
+       int                     num_syncs;              /* number of ongoing sync commands */
 
        /* client variables */
        Variables       variables;
@@ -697,6 +698,7 @@ typedef enum MetaCommand
        META_ELSE,                                      /* \else */
        META_ENDIF,                                     /* \endif */
        META_STARTPIPELINE,                     /* \startpipeline */
+       META_SYNCPIPELINE,                      /* \syncpipeline */
        META_ENDPIPELINE,                       /* \endpipeline */
 } MetaCommand;
 
@@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd)
                mc = META_ASET;
        else if (pg_strcasecmp(cmd, "startpipeline") == 0)
                mc = META_STARTPIPELINE;
+       else if (pg_strcasecmp(cmd, "syncpipeline") == 0)
+               mc = META_SYNCPIPELINE;
        else if (pg_strcasecmp(cmd, "endpipeline") == 0)
                mc = META_ENDPIPELINE;
        else
@@ -3317,8 +3321,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
                                break;
 
                        case PGRES_PIPELINE_SYNC:
-                               pg_log_debug("client %d pipeline ending", st->id);
-                               if (PQexitPipelineMode(st->con) != 1)
+                               pg_log_debug("client %d pipeline ending, ongoing syncs: %d",
+                                                        st->id, st->num_syncs);
+                               st->num_syncs--;
+                               if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1)
                                        pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
                                                                 PQerrorMessage(st->con));
                                break;
@@ -4449,6 +4455,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
                        return CSTATE_ABORTED;
                }
        }
+       else if (command->meta == META_SYNCPIPELINE)
+       {
+               if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+               {
+                       commandFailed(st, "syncpipeline", "not in pipeline mode");
+                       return CSTATE_ABORTED;
+               }
+               if (PQsendPipelineSync(st->con) == 0)
+               {
+                       commandFailed(st, "syncpipeline", "failed to send a pipeline sync");
+                       return CSTATE_ABORTED;
+               }
+               st->num_syncs++;
+       }
        else if (command->meta == META_ENDPIPELINE)
        {
                if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
@@ -4461,6 +4481,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
                        commandFailed(st, "endpipeline", "failed to send a pipeline sync");
                        return CSTATE_ABORTED;
                }
+               st->num_syncs++;
                /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
                /* collect pending results before getting out of pipeline mode */
                return CSTATE_WAIT_RESULT;
@@ -5794,7 +5815,8 @@ process_backslash_command(PsqlScanState sstate, const char *source)
        }
        else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
                         my_command->meta == META_STARTPIPELINE ||
-                        my_command->meta == META_ENDPIPELINE)
+                        my_command->meta == META_ENDPIPELINE ||
+                        my_command->meta == META_SYNCPIPELINE)
        {
                if (my_command->argc != 1)
                        syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
index f60f7e89c1fbae38101c40410334c82e3cf3a201..5d2341a203531a08fbe3e65cb94273adf09e7599 100644 (file)
@@ -814,6 +814,27 @@ $node->pgbench(
 }
        });
 
+# Working \startpipeline with \syncpipeline
+$node->pgbench(
+       '-t 1 -n -M extended',
+       0,
+       [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ],
+       [],
+       'working \startpipeline with \syncpipeline',
+       {
+               '001_pgbench_pipeline_sync' => q{
+-- test startpipeline
+\startpipeline
+select 1;
+\syncpipeline
+\syncpipeline
+select 2;
+\syncpipeline
+select 3;
+\endpipeline
+}
+       });
+
 # Working \startpipeline in prepared query mode
 $node->pgbench(
        '-t 1 -n -M prepared',
@@ -904,6 +925,21 @@ $node->pgbench(
 }
        });
 
+# Try \startpipeline with \syncpipeline without \endpipeline
+$node->pgbench(
+       '-t 2 -n -M extended',
+       2,
+       [],
+       [qr{end of script reached with pipeline open}],
+       'error: call \startpipeline and \syncpipeline without \endpipeline',
+       {
+               '001_pgbench_pipeline_7' => q{
+-- startpipeline with \syncpipeline only
+\startpipeline
+\syncpipeline
+}
+       });
+
 # Working \startpipeline in prepared query mode with serializable
 $node->pgbench(
        '-c4 -t 10 -n -M prepared',