option. If the value is an integer, it specifies the compression
level. Otherwise, it should be a comma-separated list of items,
each of the form <literal>keyword</literal> or
- <literal>keyword=value</literal>. Currently, the only supported
- keyword is <literal>level</literal>, which sets the compression
- level.
+ <literal>keyword=value</literal>. Currently, the supported keywords
+ are <literal>level</literal> and <literal>workers</literal>.
</para>
<para>
+ The <literal>level</literal> keyword sets the compression level.
For <literal>gzip</literal> the compression level should be an
integer between 1 and 9, for <literal>lz4</literal> an integer
between 1 and 12, and for <literal>zstd</literal> an integer
between 1 and 22.
</para>
+
+ <para>
+ The <literal>workers</literal> keyword sets the number of threads
+ that should be used for parallel compression. Parallel compression
+ is supported only for <literal>zstd</literal>.
+ </para>
</listitem>
</varlistentry>
integer, it specifies the compression level. Otherwise, it should be
a comma-separated list of items, each of the form
<literal>keyword</literal> or <literal>keyword=value</literal>.
- Currently, the only supported keyword is <literal>level</literal>,
- which sets the compression level.
+ Currently, the supported keywords are <literal>level</literal>
+ and <literal>workers</literal>.
</para>
<para>
If no compression level is specified, the default compression level
/* Common information for all types of sink. */
bbsink base;
- /* Compression level */
- int compresslevel;
+ /* Compression options */
+ bc_specification *compress;
ZSTD_CCtx *cctx;
ZSTD_outBuffer zstd_outBuf;
return NULL; /* keep compiler quiet */
#else
bbsink_zstd *sink;
- int compresslevel;
Assert(next != NULL);
- if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
- compresslevel = 0;
- else
- {
- compresslevel = compress->level;
- Assert(compresslevel >= 1 && compresslevel <= 22);
- }
-
sink = palloc0(sizeof(bbsink_zstd));
*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
sink->base.bbs_next = next;
- sink->compresslevel = compresslevel;
+ sink->compress = compress;
return &sink->base;
#endif
bbsink_zstd *mysink = (bbsink_zstd *) sink;
size_t output_buffer_bound;
size_t ret;
+ bc_specification *compress = mysink->compress;
mysink->cctx = ZSTD_createCCtx();
if (!mysink->cctx)
elog(ERROR, "could not create zstd compression context");
- ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
- mysink->compresslevel);
- if (ZSTD_isError(ret))
- elog(ERROR, "could not set zstd compression level to %d: %s",
- mysink->compresslevel, ZSTD_getErrorName(ret));
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ elog(ERROR, "could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+ }
+
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and trying
+ * to set it will fail. Similarly for newer versions if they are
+ * compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret)));
+ }
/*
* We need our own buffer, because we're going to pass different data to
{
#ifdef USE_ZSTD
bbstreamer_zstd_frame *streamer;
- int compresslevel;
size_t ret;
Assert(next != NULL);
exit(1);
}
- /* Initialize stream compression preferences */
- if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) == 0)
- compresslevel = 0;
- else
- compresslevel = compress->level;
- ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
- compresslevel);
- if (ZSTD_isError(ret))
+ /* Set compression level, if specified */
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) != 0)
{
- pg_log_error("could not set zstd compression level to %d: %s",
- compresslevel, ZSTD_getErrorName(ret));
- exit(1);
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ {
+ pg_log_error("could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+ exit(1);
+ }
+ }
+
+ /* Set # of workers, if specified */
+ if ((compress->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and
+ * trying to set it will fail. Similarly for newer versions if they
+ * are compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ {
+ pg_log_error("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret));
+ exit(1);
+ }
}
/* Initialize the ZSTD output buffer. */
'invalid compression specification: found empty string where a compression option was expected',
'failure on extra, empty compression option'
],
+ [
+ 'gzip:workers=3',
+ 'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
+ 'failure on worker count for gzip'
+ ],
);
for my $cft (@compression_failure_tests)
{
'compression_method' => 'zstd',
'backup_flags' => ['--compress', 'server-zstd:5'],
'enabled' => check_pg_config("#define USE_ZSTD 1")
+ },
+ {
+ 'compression_method' => 'parallel zstd',
+ 'backup_flags' => ['--compress', 'server-zstd:workers=3'],
+ 'enabled' => check_pg_config("#define USE_ZSTD 1"),
+ 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
}
);
my @verify = ('pg_verifybackup', '-e', $backup_path);
# A backup with a valid compression method should work.
- $primary->command_ok(\@backup,
- "backup done, compression method \"$method\"");
+ my $backup_stdout = '';
+ my $backup_stderr = '';
+ my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+ '2>', \$backup_stderr);
+ if ($backup_stdout ne '')
+ {
+ print "# standard output was:\n$backup_stdout";
+ }
+ if ($backup_stderr ne '')
+ {
+ print "# standard error was:\n$backup_stderr";
+ }
+ if (! $backup_result && $tc->{'possibly_unsupported'} &&
+ $backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+ {
+ skip "compression with $method not supported by this build", 2;
+ }
+ else
+ {
+ ok($backup_result, "backup done, compression $method");
+ }
# Make sure that it verifies OK.
$primary->command_ok(\@verify,
'decompress_program' => $ENV{'ZSTD'},
'decompress_flags' => [ '-d' ],
'enabled' => check_pg_config("#define USE_ZSTD 1")
+ },
+ {
+ 'compression_method' => 'parallel zstd',
+ 'backup_flags' => ['--compress', 'client-zstd:workers=3'],
+ 'backup_archive' => 'base.tar.zst',
+ 'decompress_program' => $ENV{'ZSTD'},
+ 'decompress_flags' => [ '-d' ],
+ 'enabled' => check_pg_config("#define USE_ZSTD 1"),
+ 'possibly_unsupported' => qr/could not set compression worker count to 3: Unsupported parameter/
}
);
'pg_basebackup', '-D', $backup_path,
'-Xfetch', '--no-sync', '-cfast', '-Ft');
push @backup, @{$tc->{'backup_flags'}};
- $primary->command_ok(\@backup,
- "client side backup, compression $method");
-
+ my $backup_stdout = '';
+ my $backup_stderr = '';
+ my $backup_result = $primary->run_log(\@backup, '>', \$backup_stdout,
+ '2>', \$backup_stderr);
+ if ($backup_stdout ne '')
+ {
+ print "# standard output was:\n$backup_stdout";
+ }
+ if ($backup_stderr ne '')
+ {
+ print "# standard error was:\n$backup_stderr";
+ }
+ if (! $backup_result && $tc->{'possibly_unsupported'} &&
+ $backup_stderr =~ /$tc->{'possibly_unsupported'}/)
+ {
+ skip "compression with $method not supported by this build", 3;
+ }
+ else
+ {
+ ok($backup_result, "client side backup, compression $method");
+ }
# Verify that the we got the files we expected.
my $backup_files = join(',',
result->level = expect_integer_value(keyword, value, result);
result->options |= BACKUP_COMPRESSION_OPTION_LEVEL;
}
+ else if (strcmp(keyword, "workers") == 0)
+ {
+ result->workers = expect_integer_value(keyword, value, result);
+ result->options |= BACKUP_COMPRESSION_OPTION_WORKERS;
+ }
else
result->parse_error =
psprintf(_("unknown compression option \"%s\""), keyword);
min_level, max_level);
}
+ /*
+ * Of the compression algorithms that we currently support, only zstd
+ * allows parallel workers.
+ */
+ if ((spec->options & BACKUP_COMPRESSION_OPTION_WORKERS) != 0 &&
+ (spec->algorithm != BACKUP_COMPRESSION_ZSTD))
+ {
+ return psprintf(_("compression algorithm \"%s\" does not accept a worker count"),
+ get_bc_algorithm_name(spec->algorithm));
+ }
+
return NULL;
}
} bc_algorithm;
#define BACKUP_COMPRESSION_OPTION_LEVEL (1 << 0)
+#define BACKUP_COMPRESSION_OPTION_WORKERS (1 << 1)
typedef struct bc_specification
{
bc_algorithm algorithm;
unsigned options; /* OR of BACKUP_COMPRESSION_OPTION constants */
int level;
+ int workers;
char *parse_error; /* NULL if parsing was OK, else message */
} bc_specification;