Allow parallel zstd compression when taking a base backup.
authorRobert Haas <rhaas@postgresql.org>
Wed, 30 Mar 2022 13:35:14 +0000 (09:35 -0400)
committerRobert Haas <rhaas@postgresql.org>
Wed, 30 Mar 2022 13:41:26 +0000 (09:41 -0400)
libzstd allows transparent parallel compression just by setting
an option when creating the compression context, so permit that
for both client and server-side backup compression. To use this,
use something like pg_basebackup --compress WHERE-zstd:workers=N
where WHERE is "client" or "server" and N is an integer.

When compression is performed on the server side, this will spawn
threads inside the PostgreSQL backend. While there is almost no
PostgreSQL server code which is thread-safe, the threads here are used
internally by libzstd and touch only data structures controlled by
libzstd.

Patch by me, based in part on earlier work by Dipesh Pandit
and Jeevan Ladhe. Reviewed by Justin Pryzby.

Discussion: http://postgr.es/m/CA+Tgmobj6u-nWF-j=FemygUhobhryLxf9h-wJN7W-2rSsseHNA@mail.gmail.com

doc/src/sgml/protocol.sgml
doc/src/sgml/ref/pg_basebackup.sgml
src/backend/replication/basebackup_zstd.c
src/bin/pg_basebackup/bbstreamer_zstd.c
src/bin/pg_basebackup/t/010_pg_basebackup.pl
src/bin/pg_verifybackup/t/009_extract.pl
src/bin/pg_verifybackup/t/010_client_untar.pl
src/common/backup_compression.c
src/include/common/backup_compression.h

index 2fa3cedfe9e50f4da6862ee4e4aaa070cede9979..98f0bc3cc34ff9e914b37cadf6b77b2f90bd90c9 100644 (file)
@@ -2739,17 +2739,23 @@ The commands accepted in replication mode are:
           option.  If the value is an integer, it specifies the compression
           level.  Otherwise, it should be a comma-separated list of items,
           each of the form <literal>keyword</literal> or
-          <literal>keyword=value</literal>. Currently, the only supported
-          keyword is <literal>level</literal>, which sets the compression
-          level.
+          <literal>keyword=value</literal>. Currently, the supported keywords
+          are <literal>level</literal> and <literal>workers</literal>.
         </para>
 
         <para>
+          The <literal>level</literal> keyword sets the compression level.
           For <literal>gzip</literal> the compression level should be an
           integer between 1 and 9, for <literal>lz4</literal> an integer
           between 1 and 12, and for <literal>zstd</literal> an integer
           between 1 and 22.
          </para>
+
+        <para>
+          The <literal>workers</literal> keyword sets the number of threads
+          that should be used for parallel compression. Parallel compression
+          is supported only for <literal>zstd</literal>.
+         </para>
         </listitem>
        </varlistentry>
 
index d9233beb8e1d3c30aabaa76ffff3725fcc407516..82f5f60625080973f7eaa73ec3f79d19757ffad7 100644 (file)
@@ -424,8 +424,8 @@ PostgreSQL documentation
         integer, it specifies the compression level.  Otherwise, it should be
         a comma-separated list of items, each of the form
         <literal>keyword</literal> or <literal>keyword=value</literal>.
-        Currently, the only supported keyword is <literal>level</literal>,
-        which sets the compression level.
+        Currently, the supported keywords are <literal>level</literal>
+        and <literal>workers</literal>.
        </para>
        <para>
         If no compression level is specified, the default compression level
index 5496eaa72b71a37abb9e996344b2517667c5706c..f6876f48118bd4853d9021da20c6b23dd7ffeda2 100644 (file)
@@ -25,8 +25,8 @@ typedef struct bbsink_zstd
        /* Common information for all types of sink. */
        bbsink          base;
 
-       /* Compression level */
-       int                     compresslevel;
+       /* Compression options */
+       bc_specification *compress;
 
        ZSTD_CCtx  *cctx;
        ZSTD_outBuffer zstd_outBuf;
@@ -67,22 +67,13 @@ bbsink_zstd_new(bbsink *next, bc_specification *compress)
        return NULL;                            /* keep compiler quiet */
 #else
        bbsink_zstd *sink;
-       int             compresslevel;
 
        Assert(next != NULL);
 
-       if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
-               compresslevel = 0;
-       else
-       {
-               compresslevel = compress->level;
-               Assert(compresslevel >= 1 && compresslevel <= 22);
-       }
-
        sink = palloc0(sizeof(bbsink_zstd));
        *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
        sink->base.bbs_next = next;
-       sink->compresslevel = compresslevel;
+       sink->compress = compress;
 
        return &sink->base;
 #endif
@@ -99,16 +90,36 @@ bbsink_zstd_begin_backup(bbsink *sink)
        bbsink_zstd *mysink = (bbsink_zstd *) sink;
        size_t          output_buffer_bound;
        size_t          ret;
+       bc_specification *compress = mysink->compress;
 
        mysink->cctx = ZSTD_createCCtx();
        if (!mysink->cctx)
                elog(ERROR, "could not create zstd compression context");
 
-       ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
-                                                                mysink->compresslevel);
-       if (ZSTD_isError(ret))
-               elog(ERROR, "could not set zstd compression level to %d: %s",
-                        mysink->compresslevel, ZSTD_getErrorName(ret));
+       if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
+       {
+               ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
+                                                                        compress->level);
+               if (ZSTD_isError(ret))
+                       elog(ERROR, "could not set zstd compression level to %d: %s",
+                                compress->level, ZSTD_getErrorName(ret));
+       }
+
+       if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
+       {
+               /*
+                * On older versions of libzstd, this option does not exist, and trying
+                * to set it will fail. Similarly for newer versions if they are
+                * compiled without threading support.
+                */
+               ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+                                                                        compress->workers);
+               if (ZSTD_isError(ret))
+                       ereport(ERROR,
+                                       errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                       errmsg("could not set compression worker count to %d: %s",
+                                                  compress->workers, ZSTD_getErrorName(ret)));
+       }
 
        /*
         * We need our own buffer, because we're going to pass different data to
index 7946b6350b6d7823cbdc93ddbc33feaa82703db4..f94c5c041d3d339d306b869dea9759b2d2794c5b 100644 (file)
@@ -67,7 +67,6 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
 {
 #ifdef USE_ZSTD
        bbstreamer_zstd_frame *streamer;
-       int                     compresslevel;
        size_t          ret;
 
        Assert(next != NULL);
@@ -88,18 +87,35 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, bc_specification *compress)
                exit(1);
        }
 
-       /* Initialize stream compression preferences */
-       if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
-               compresslevel = 0;
-       else
-               compresslevel = compress->level;
-       ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
-                                                                compresslevel);
-       if (ZSTD_isError(ret))
+       /* Set compression level, if specified */
+       if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
        {
-               pg_log_error("could not set zstd compression level to %d: %s",
-                                        compresslevel, ZSTD_getErrorName(ret));
-               exit(1);
+               ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+                                                                        compress->level);
+               if (ZSTD_isError(ret))
+               {
+                       pg_log_error("could not set zstd compression level to %d: %s",
+                                                compress->level, ZSTD_getErrorName(ret));
+                       exit(1);
+               }
+       }
+
+       /* Set # of workers, if specified */
+       if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
+       {
+               /*
+                * On older versions of libzstd, this option does not exist, and
+                * trying to set it will fail. Similarly for newer versions if they
+                * are compiled without threading support.
+                */
+               ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+                                                                        compress->workers);
+               if (ZSTD_isError(ret))
+               {
+                       pg_log_error("could not set compression worker count to %d: %s",
+                                                compress->workers, ZSTD_getErrorName(ret));
+                       exit(1);
+               }
        }
 
        /* Initialize the ZSTD output buffer. */
index 47f3d00ac454e84bbe0a479bd6bfe97af4f6f5e2..5ba84c22509e1ff465d9a046464c579eeda4ad87 100644 (file)
@@ -130,6 +130,11 @@ my @compression_failure_tests = (
                'invalid compression specification: found empty string where a compression option was expected',
                'failure on extra, empty compression option'
        ],
+       [
+               'gzip:workers=3',
+               'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
+               'failure on worker count for gzip'
+       ],
 );
 for my $cft (@compression_failure_tests)
 {
index 41a5b370cc5c0a035f0e942e6a7db344f2cfc0e9..d6f11b95535d88cbca8b8b868b05ddf0191990e5 100644 (file)
@@ -34,6 +34,12 @@ my @test_configuration = (
                'compression_method' => 'zstd',
                'backup_flags' => ['--compress', 'server-zstd:5'],
                'enabled' => check_pg_config("#define USE_ZSTD 1")
+       },
+       {
+               'compression_method' => 'parallel zstd',
+               'backup_flags' => ['--compress', 'server-zstd:workers=3'],
+               'enabled' => check_pg_config("#define USE_ZSTD 1"),
+               'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
        }
 );
 
@@ -55,8 +61,27 @@ for my $tc (@test_configuration)
                my @verify = ('pg_verifybackup', '-e', $backup_path);
 
                # A backup with a valid compression method should work.
-               $primary->command_ok(\@backup,
-                                                        "backup done, compression method \"$method\"");
+               my $backup_stdout = '';
+               my $backup_stderr = '';
+               my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+                                                                                         '2>', \$backup_stderr);
+               if ($backup_stdout ne '')
+               {
+                       print "# standard output was:\n$backup_stdout";
+               }
+               if ($backup_stderr ne '')
+               {
+                       print "# standard error was:\n$backup_stderr";
+               }
+               if (! $backup_result && $tc->{'possibly_unsupported'} &&
+                       $backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+               {
+                       skip "compression with $method not supported by this build", 2;
+               }
+               else
+               {
+                       ok($backup_result, "backup done, compression $method");
+               }
 
                # Make sure that it verifies OK.
                $primary->command_ok(\@verify,
index 488a6d1edeebd2baf5bef53f357172d79be1948f..c1cd12cb065f17facc6415c9af9c570d3e8b8655 100644 (file)
@@ -49,6 +49,15 @@ my @test_configuration = (
                'decompress_program' => $ENV{'ZSTD'},
                'decompress_flags' => [ '-d' ],
                'enabled' => check_pg_config("#define USE_ZSTD 1")
+       },
+       {
+               'compression_method' => 'parallel zstd',
+               'backup_flags' => ['--compress', 'client-zstd:workers=3'],
+               'backup_archive' => 'base.tar.zst',
+               'decompress_program' => $ENV{'ZSTD'},
+               'decompress_flags' => [ '-d' ],
+               'enabled' => check_pg_config("#define USE_ZSTD 1"),
+               'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
        }
 );
 
@@ -69,9 +78,27 @@ for my $tc (@test_configuration)
                        'pg_basebackup', '-D', $backup_path,
                        '-Xfetch', '--no-sync', '-cfast', '-Ft');
                push @backup, @{$tc->{'backup_flags'}};
-               $primary->command_ok(\@backup,
-                                                        "client side backup, compression $method");
-
+               my $backup_stdout = '';
+               my $backup_stderr = '';
+               my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+                                                                                         '2>', \$backup_stderr);
+               if ($backup_stdout ne '')
+               {
+                       print "# standard output was:\n$backup_stdout";
+               }
+               if ($backup_stderr ne '')
+               {
+                       print "# standard error was:\n$backup_stderr";
+               }
+               if (! $backup_result && $tc->{'possibly_unsupported'} &&
+                       $backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+               {
+                       skip "compression with $method not supported by this build", 3;
+               }
+               else
+               {
+                       ok($backup_result, "client side backup, compression $method");
+               }
 
                # Verify that the we got the files we expected.
                my $backup_files = join(',',
index 0650f975c448d585d7a907408b88bcfea50752ad..969e08cca2039968154b4ab20090f41abf744e87 100644 (file)
@@ -177,6 +177,11 @@ parse_bc_specification(bc_algorithm algorithm, char *specification,
                        result->level = expect_integer_value(keyword, value, result);
                        result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
                }
+               else if (strcmp(keyword, "workers") == 0)
+               {
+                       result->workers = expect_integer_value(keyword, value, result);
+                       result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
+               }
                else
                        result->parse_error =
                                psprintf(_("unknown compression option \"%s\""), keyword);
@@ -266,5 +271,16 @@ validate_bc_specification(bc_specification *spec)
                                                        min_level, max_level);
        }
 
+       /*
+        * Of the compression algorithms that we currently support, only zstd
+        * allows parallel workers.
+        */
+       if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
+               (spec->algorithm != BACKUP_COMPRESSION_ZSTD))
+       {
+               return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
+                                               get_bc_algorithm_name(spec->algorithm));
+       }
+
        return NULL;
 }
index 0565cbc657d9442b33d7361eb0782fa578c74c48..6a0ecaa99c99bcb4c30795c42768aa644b7a4323 100644 (file)
@@ -23,12 +23,14 @@ typedef enum bc_algorithm
 } bc_algorithm;
 
 #define        BACKUP_COMPRESSION_OPTION_LEVEL                 (1 << 0)
+#define BACKUP_COMPRESSION_OPTION_WORKERS              (1 << 1)
 
 typedef struct bc_specification
 {
        bc_algorithm algorithm;
        unsigned        options;                /* OR of BACKUP_COMPRESSION_OPTION constants */
        int                     level;
+       int                     workers;
        char       *parse_error;        /* NULL if parsing was OK, else message */
 } bc_specification;