pg_basebackup: Allow client-side LZ4 (de)compression.
authorRobert Haas <rhaas@postgresql.org>
Fri, 11 Feb 2022 14:41:42 +0000 (09:41 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 11 Feb 2022 14:41:42 +0000 (09:41 -0500)
LZ4 compression can now be performed on the client using
pg_basebackup -Ft --compress client-lz4, and LZ4 decompression of
a backup compressed on the server can be performed on the client
using pg_basebackup -Fp --compress server-lz4.

Dipesh Pandit, reviewed and tested by Jeevan Ladhe and Tushar Ahuja,
with a few corrections - and some documentation - by me.

Discussion: http://postgr.es/m/CAN1g5_FeDmiA9D8wdG2W6Lkq5CpubxOAqTmd2et9hsinTJtsMQ@mail.gmail.com

doc/src/sgml/ref/pg_basebackup.sgml
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/bbstreamer.h
src/bin/pg_basebackup/bbstreamer_lz4.c [new file with mode: 0644]
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_verifybackup/t/009_extract.pl
src/bin/pg_verifybackup/t/010_client_untar.pl [new file with mode: 0644]
src/tools/msvc/Mkvcbuild.pm

index 7a1b432eba33bc63ccaf6ceda6c50bf2f261b3dc..53aa40dcd192b72a9e959464c67373cf09cd990b 100644 (file)
@@ -417,18 +417,14 @@ PostgreSQL documentation
         specify <literal>-Xfetch</literal>.
        </para>
        <para>
-        The compression method can be set to <literal>gzip</literal> for
-        compression with <application>gzip</application>, or
-        <literal>lz4</literal> for compression with
-        <application>lz4</application>, or <literal>none</literal> for no
-        compression. However, <literal>lz4</literal> can be currently only
-        used with <literal>server</literal>. A compression level can be
-        optionally specified, by appending the level number after a
-        colon (<literal>:</literal>). If no level is specified, the default
-        compression level will be used. If only a level is specified without
-        mentioning an algorithm, <literal>gzip</literal> compression will
-        be used if the level is greater than 0, and no compression will be
-        used if the level is 0.
+        The compression method can be set to <literal>gzip</literal> or
+        <literal>lz4</literal>, or <literal>none</literal> for no
+        compression. A compression level can be optionally specified, by
+        appending the level number after a colon (<literal>:</literal>). If no
+        level is specified, the default compression level will be used. If
+        only a level is specified without mentioning an algorithm,
+        <literal>gzip</literal> compression will be used if the level is
+        greater than 0, and no compression will be used if the level is 0.
        </para>
        <para>
         When the tar format is used with <literal>gzip</literal> or
@@ -439,6 +435,13 @@ PostgreSQL documentation
         compression. If this is done, the server will compress the backup for
         transmission, and the client will decompress and extract it.
        </para>
+       <para>
+        When this option is used in combination with
+        <literal>-Xstream</literal>, <literal>pg_wal.tar</literal> will
+        be compressed using <literal>gzip</literal> if client-side gzip
+        compression is selected, but will not be compressed if server-side
+        compresion or LZ4 compresion is selected.
+       </para>
       </listitem>
      </varlistentry>
     </variablelist>
index ada3a5a5783347b7509c2dc3704daeeb6259b0ac..1d0db4f9d025ea87d42e64abb894f0ffd442dcb9 100644 (file)
@@ -43,6 +43,7 @@ BBOBJS = \
        bbstreamer_file.o \
        bbstreamer_gzip.o \
        bbstreamer_inject.o \
+       bbstreamer_lz4.o \
        bbstreamer_tar.o
 
 all: pg_basebackup pg_receivewal pg_recvlogical
index fe49ae35e50586b76e2015acd12fa40a994b28af..c2de77bacc0409088a3a6740570be81f90cb6fee 100644 (file)
@@ -206,6 +206,9 @@ extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
                                                                                        void (*report_output_file) (const char *));
 
 extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
+                                                                                                int compresslevel);
+extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c
new file mode 100644 (file)
index 0000000..f0bc226
--- /dev/null
@@ -0,0 +1,431 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_lz4.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *               src/bin/pg_basebackup/bbstreamer_lz4.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+#ifdef HAVE_LIBLZ4
+typedef struct bbstreamer_lz4_frame
+{
+       bbstreamer      base;
+
+       LZ4F_compressionContext_t       cctx;
+       LZ4F_decompressionContext_t     dctx;
+       LZ4F_preferences_t                      prefs;
+
+       size_t          bytes_written;
+       bool            header_written;
+} bbstreamer_lz4_frame;
+
+static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+                                                                                         bbstreamer_member *member,
+                                                                                         const char *data, int len,
+                                                                                         bbstreamer_archive_context context);
+static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
+       .content = bbstreamer_lz4_compressor_content,
+       .finalize = bbstreamer_lz4_compressor_finalize,
+       .free = bbstreamer_lz4_compressor_free
+};
+
+static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+                                                                                               bbstreamer_member *member,
+                                                                                               const char *data, int len,
+                                                                                               bbstreamer_archive_context context);
+static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
+       .content = bbstreamer_lz4_decompressor_content,
+       .finalize = bbstreamer_lz4_decompressor_finalize,
+       .free = bbstreamer_lz4_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs lz4 compression of tar
+ * blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel)
+{
+#ifdef HAVE_LIBLZ4
+       bbstreamer_lz4_frame   *streamer;
+       LZ4F_errorCode_t                ctxError;
+       LZ4F_preferences_t         *prefs;
+       size_t                                  compressed_bound;
+
+       Assert(next != NULL);
+
+       streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+       *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+               &bbstreamer_lz4_compressor_ops;
+
+       streamer->base.bbs_next = next;
+       initStringInfo(&streamer->base.bbs_buffer);
+       streamer->header_written = false;
+
+       /* Initialize stream compression preferences */
+       prefs = &streamer->prefs;
+       memset(prefs, 0, sizeof(LZ4F_preferences_t));
+       prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+       prefs->compressionLevel = compresslevel;
+
+       /*
+        * Find out the compression bound, it specifies the minimum destination
+        * capacity required in worst case for the success of compression operation
+        * (LZ4F_compressUpdate) based on a given source size and preferences.
+        */
+       compressed_bound = LZ4F_compressBound(streamer->base.bbs_buffer.maxlen, prefs);
+
+       /* Enlarge buffer if it falls short of compression bound. */
+       if (streamer->base.bbs_buffer.maxlen <= compressed_bound)
+               enlargeStringInfo(&streamer->base.bbs_buffer, compressed_bound);
+
+       ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
+       if (LZ4F_isError(ctxError))
+                       pg_log_error("could not create lz4 compression context: %s",
+                                                LZ4F_getErrorName(ctxError));
+
+       return &streamer->base;
+#else
+       pg_log_error("this build does not support compression");
+       exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+                                                                 bbstreamer_member *member,
+                                                                 const char *data, int len,
+                                                                 bbstreamer_archive_context context)
+{
+       bbstreamer_lz4_frame   *mystreamer;
+       uint8                              *next_in,
+                                                  *next_out;
+       size_t                                  out_bound,
+                                                       compressed_size,
+                                                       avail_out;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+       next_in = (uint8 *) data;
+
+       /* Write header before processing the first input chunk. */
+       if (!mystreamer->header_written)
+       {
+               compressed_size = LZ4F_compressBegin(mystreamer->cctx,
+                                                                                        (uint8 *) mystreamer->base.bbs_buffer.data,
+                                                                                        mystreamer->base.bbs_buffer.maxlen,
+                                                                                        &mystreamer->prefs);
+
+               if (LZ4F_isError(compressed_size))
+                       pg_log_error("could not write lz4 header: %s",
+                                                LZ4F_getErrorName(compressed_size));
+
+               mystreamer->bytes_written += compressed_size;
+               mystreamer->header_written = true;
+       }
+
+       /*
+        * Update the offset and capacity of output buffer based on number of bytes
+        * written to output buffer.
+        */
+       next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+       avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+       /*
+        * Find out the compression bound and make sure that output buffer has the
+        * required capacity for the success of LZ4F_compressUpdate. If needed
+        * forward the content to next streamer and empty the buffer.
+        */
+       out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
+       Assert(mystreamer->base.bbs_buffer.maxlen >= out_bound);
+       if (avail_out <= out_bound)
+       {
+                       bbstreamer_content(mystreamer->base.bbs_next, member,
+                                                          mystreamer->base.bbs_buffer.data,
+                                                          mystreamer->bytes_written,
+                                                          context);
+
+                       avail_out = mystreamer->base.bbs_buffer.maxlen;
+                       mystreamer->bytes_written = 0;
+                       next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+       }
+
+       /*
+        * This call compresses the data starting at next_in and generates the
+        * output starting at next_out. It expects the caller to provide the size
+        * of input buffer and capacity of output buffer by providing parameters
+        * len and avail_out.
+        *
+        * It returns the number of bytes compressed to output buffer.
+        */
+       compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
+                                                                                 next_out, avail_out,
+                                                                                 next_in, len, NULL);
+
+       if (LZ4F_isError(compressed_size))
+               pg_log_error("could not compress data: %s",
+                                        LZ4F_getErrorName(compressed_size));
+
+       mystreamer->bytes_written += compressed_size;
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
+{
+       bbstreamer_lz4_frame   *mystreamer;
+       uint8                              *next_out;
+       size_t                                  footer_bound,
+                                                       compressed_size,
+                                                       avail_out;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+       /* Find out the footer bound and update the output buffer. */
+       footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
+       Assert(mystreamer->base.bbs_buffer.maxlen >= footer_bound);
+       if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <=
+               footer_bound)
+       {
+                       bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                                                          mystreamer->base.bbs_buffer.data,
+                                                          mystreamer->bytes_written,
+                                                          BBSTREAMER_UNKNOWN);
+
+                       avail_out = mystreamer->base.bbs_buffer.maxlen;
+                       mystreamer->bytes_written = 0;
+                       next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+       }
+       else
+       {
+               next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+               avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+       }
+
+       /*
+        * Finalize the frame and flush whatever data remaining in compression
+        * context.
+        */
+       compressed_size = LZ4F_compressEnd(mystreamer->cctx,
+                                                                          next_out, avail_out, NULL);
+
+       if (LZ4F_isError(compressed_size))
+               pg_log_error("could not end lz4 compression: %s",
+                                        LZ4F_getErrorName(compressed_size));
+
+       mystreamer->bytes_written += compressed_size;
+
+       bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                                          mystreamer->base.bbs_buffer.data,
+                                          mystreamer->bytes_written,
+                                          BBSTREAMER_UNKNOWN);
+
+       bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_compressor_free(bbstreamer *streamer)
+{
+       bbstreamer_lz4_frame    *mystreamer;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+       bbstreamer_free(streamer->bbs_next);
+       LZ4F_freeCompressionContext(mystreamer->cctx);
+       pfree(streamer->bbs_buffer.data);
+       pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of lz4
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_decompressor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBLZ4
+       bbstreamer_lz4_frame    *streamer;
+       LZ4F_errorCode_t                ctxError;
+
+       Assert(next != NULL);
+
+       streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+       *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+               &bbstreamer_lz4_decompressor_ops;
+
+       streamer->base.bbs_next = next;
+       initStringInfo(&streamer->base.bbs_buffer);
+
+       /* Initialize internal stream state for decompression */
+       ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
+       if (LZ4F_isError(ctxError))
+       {
+               pg_log_error("could not initialize compression library: %s",
+                               LZ4F_getErrorName(ctxError));
+               exit(1);
+       }
+
+       return &streamer->base;
+#else
+       pg_log_error("this build does not support compression");
+       exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+                                                                       bbstreamer_member *member,
+                                                                       const char *data, int len,
+                                                                       bbstreamer_archive_context context)
+{
+       bbstreamer_lz4_frame   *mystreamer;
+       uint8                              *next_in,
+                                                  *next_out;
+       size_t                                  avail_in,
+                                                       avail_out;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+       next_in = (uint8 *) data;
+       next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+       avail_in = len;
+       avail_out = mystreamer->base.bbs_buffer.maxlen;
+
+       while (avail_in > 0)
+       {
+               size_t  ret,
+                               read_size,
+                               out_size;
+
+               read_size = avail_in;
+               out_size = avail_out;
+
+               /*
+                * This call decompresses the data starting at next_in and generates
+                * the output data starting at next_out. It expects the caller to
+                * provide size of the input buffer and total capacity of the output
+                * buffer by providing the read_size and out_size parameters
+                * respectively.
+                *
+                * Per the documentation of LZ4, parameters read_size and out_size
+                * behaves as dual parameters. On return, the number of bytes consumed
+                * from the input buffer will be written back to read_size and the
+                * number of bytes decompressed to output buffer will be written back
+                * to out_size respectively.
+                */
+               ret = LZ4F_decompress(mystreamer->dctx,
+                                                         next_out, &out_size,
+                                                         next_in, &read_size, NULL);
+
+               if (LZ4F_isError(ret))
+                       pg_log_error("could not decompress data: %s",
+                                                LZ4F_getErrorName(ret));
+
+               /* Update input buffer based on number of bytes consumed */
+               avail_in -= read_size;
+               next_in += read_size;
+
+               mystreamer->bytes_written += out_size;
+
+               /*
+                * If output buffer is full then forward the content to next streamer and
+                * update the output buffer.
+                */
+               if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+               {
+                       bbstreamer_content(mystreamer->base.bbs_next, member,
+                                                          mystreamer->base.bbs_buffer.data,
+                                                          mystreamer->base.bbs_buffer.maxlen,
+                                                          context);
+
+                       avail_out = mystreamer->base.bbs_buffer.maxlen;
+                       mystreamer->bytes_written = 0;
+                       next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+               }
+               else
+               {
+                       avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+                       next_out += mystreamer->bytes_written;
+               }
+       }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
+{
+       bbstreamer_lz4_frame    *mystreamer;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+       /*
+        * End of the stream, if there is some pending data in output buffers then
+        * we must forward it to next streamer.
+        */
+       bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                                          mystreamer->base.bbs_buffer.data,
+                                          mystreamer->base.bbs_buffer.maxlen,
+                                          BBSTREAMER_UNKNOWN);
+
+       bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
+{
+       bbstreamer_lz4_frame    *mystreamer;
+
+       mystreamer = (bbstreamer_lz4_frame *) streamer;
+       bbstreamer_free(streamer->bbs_next);
+       LZ4F_freeDecompressionContext(mystreamer->dctx);
+       pfree(streamer->bbs_buffer.data);
+       pfree(streamer);
+}
+#endif
index 923659ddee5f5dc6fdc69136df85e85397ab2fc7..0003b59615753a88309330d48bb2f27f669b2bd6 100644 (file)
@@ -560,11 +560,16 @@ LogStreamerMain(logstreamer_param *param)
                                                                                          COMPRESSION_NONE,
                                                                                          compresslevel,
                                                                                          stream.do_sync);
-       else
+       else if (compressmethod == COMPRESSION_GZIP)
                stream.walmethod = CreateWalTarMethod(param->xlog,
                                                                                          compressmethod,
                                                                                          compresslevel,
                                                                                          stream.do_sync);
+       else
+               stream.walmethod = CreateWalTarMethod(param->xlog,
+                                                                                         COMPRESSION_NONE,
+                                                                                         compresslevel,
+                                                                                         stream.do_sync);
 
        if (!ReceiveXlogStream(param->bgconn, &stream))
 
@@ -1003,6 +1008,16 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
                *methodres = COMPRESSION_GZIP;
                *locationres = COMPRESS_LOCATION_SERVER;
        }
+       else if (pg_strcasecmp(firstpart, "lz4") == 0)
+       {
+               *methodres = COMPRESSION_LZ4;
+               *locationres = COMPRESS_LOCATION_UNSPECIFIED;
+       }
+       else if (pg_strcasecmp(firstpart, "client-lz4") == 0)
+       {
+               *methodres = COMPRESSION_LZ4;
+               *locationres = COMPRESS_LOCATION_CLIENT;
+       }
        else if (pg_strcasecmp(firstpart, "server-lz4") == 0)
        {
                *methodres = COMPRESSION_LZ4;
@@ -1125,7 +1140,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
        bbstreamer *manifest_inject_streamer = NULL;
        bool            inject_manifest;
        bool            is_tar,
-                               is_tar_gz;
+                               is_tar_gz,
+                               is_tar_lz4;
        bool            must_parse_archive;
        int                     archive_name_len = strlen(archive_name);
 
@@ -1144,6 +1160,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
        is_tar_gz = (archive_name_len > 8 &&
                                 strcmp(archive_name + archive_name_len - 3, ".gz") == 0);
 
+       /* Is this a LZ4 archive? */
+       is_tar_lz4 = (archive_name_len > 8 &&
+                                 strcmp(archive_name + archive_name_len - 4, ".lz4") == 0);
+
        /*
         * We have to parse the archive if (1) we're suppose to extract it, or if
         * (2) we need to inject backup_manifest or recovery configuration into it.
@@ -1153,7 +1173,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
                (spclocation == NULL && writerecoveryconf));
 
        /* At present, we only know how to parse tar archives. */
-       if (must_parse_archive && !is_tar && !is_tar_gz)
+       if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4)
        {
                pg_log_error("unable to parse archive: %s", archive_name);
                pg_log_info("only tar archives can be parsed");
@@ -1217,6 +1237,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
                                                                                                  archive_file,
                                                                                                  compresslevel);
                }
+               else if (compressmethod == COMPRESSION_LZ4)
+               {
+                       strlcat(archive_filename, ".lz4", sizeof(archive_filename));
+                       streamer = bbstreamer_plain_writer_new(archive_filename,
+                                                                                                  archive_file);
+                       streamer = bbstreamer_lz4_compressor_new(streamer,
+                                                                                                        compresslevel);
+               }
                else
                {
                        Assert(false);          /* not reachable */
@@ -1269,9 +1297,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
         * If the user has requested a server compressed archive along with archive
         * extraction at client then we need to decompress it.
         */
-       if (format == 'p' && compressmethod == COMPRESSION_GZIP &&
-                       compressloc == COMPRESS_LOCATION_SERVER)
-               streamer = bbstreamer_gzip_decompressor_new(streamer);
+       if (format == 'p' && compressloc == COMPRESS_LOCATION_SERVER)
+       {
+               if (compressmethod == COMPRESSION_GZIP)
+                       streamer = bbstreamer_gzip_decompressor_new(streamer);
+               else if (compressmethod == COMPRESSION_LZ4)
+                       streamer = bbstreamer_lz4_decompressor_new(streamer);
+       }
 
        /* Return the results. */
        *manifest_inject_streamer_p = manifest_inject_streamer;
index 51b77e4bfe64f82e00a806ff9417ff5b9c9d185a..9f9a7cc6a5fac5d38fc59754f2c0321382d1cb2c 100644 (file)
@@ -11,7 +11,7 @@ use Config;
 use File::Path qw(rmtree);
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 4;
+use Test::More tests => 6;
 
 my $primary = PostgreSQL::Test::Cluster->new('primary');
 $primary->init(allows_streaming => 1);
@@ -27,6 +27,11 @@ my @test_configuration = (
                'compression_method' => 'gzip',
                'backup_flags' => ['--compress', 'server-gzip:5'],
                'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+       },
+       {
+               'compression_method' => 'lz4',
+               'backup_flags' => ['--compress', 'server-lz4:5'],
+               'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
        }
 );
 
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
new file mode 100644 (file)
index 0000000..34c9b90
--- /dev/null
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# This test case aims to verify that client-side backup compression work
+# properly, and it also aims to verify that pg_verifybackup can verify a base
+# backup that didn't start out in plain format.
+
+use strict;
+use warnings;
+use Config;
+use File::Path qw(rmtree);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+my $backup_path = $primary->backup_dir . '/client-backup';
+my $extract_path = $primary->backup_dir . '/extracted-backup';
+
+my @test_configuration = (
+       {
+               'compression_method' => 'none',
+               'backup_flags' => [],
+               'backup_archive' => 'base.tar',
+               'enabled' => 1
+       },
+       {
+               'compression_method' => 'gzip',
+               'backup_flags' => ['--compress', 'client-gzip:5'],
+               'backup_archive' => 'base.tar.gz',
+               'decompress_program' => $ENV{'GZIP_PROGRAM'},
+               'decompress_flags' => [ '-d' ],
+               'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+       },
+       {
+               'compression_method' => 'lz4',
+               'backup_flags' => ['--compress', 'client-lz4:5'],
+               'backup_archive' => 'base.tar.lz4',
+               'decompress_program' => $ENV{'LZ4'},
+               'decompress_flags' => [ '-d' ],
+               'output_file' => 'base.tar',
+               'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
+       }
+);
+
+for my $tc (@test_configuration)
+{
+       my $method = $tc->{'compression_method'};
+
+       SKIP: {
+               skip "$method compression not supported by this build", 3
+                       if ! $tc->{'enabled'};
+               skip "no decompressor available for $method", 3
+                       if exists $tc->{'decompress_program'} &&
+                       !defined $tc->{'decompress_program'};
+
+               # Take a client-side backup.
+               my @backup      = (
+                       'pg_basebackup', '-D', $backup_path,
+                       '-Xfetch', '--no-sync', '-cfast', '-Ft');
+               push @backup, @{$tc->{'backup_flags'}};
+               $primary->command_ok(\@backup,
+                                                        "client side backup, compression $method");
+
+
+               # Verify that the we got the files we expected.
+               my $backup_files = join(',',
+                       sort grep { $_ ne '.' && $_ ne '..' } slurp_dir($backup_path));
+               my $expected_backup_files = join(',',
+                       sort ('backup_manifest', $tc->{'backup_archive'}));
+               is($backup_files,$expected_backup_files,
+                       "found expected backup files, compression $method");
+
+               # Decompress.
+               if (exists $tc->{'decompress_program'})
+               {
+                       my @decompress = ($tc->{'decompress_program'});
+                       push @decompress, @{$tc->{'decompress_flags'}}
+                               if $tc->{'decompress_flags'};
+                       push @decompress, $backup_path . '/' . $tc->{'backup_archive'};
+                       push @decompress, $backup_path . '/' . $tc->{'output_file'}
+                               if $tc->{'output_file'};
+                       system_or_bail(@decompress);
+               }
+
+               SKIP: {
+                       my $tar = $ENV{TAR};
+                       # don't check for a working tar here, to accomodate various odd
+                       # cases such as AIX. If tar doesn't work the init_from_backup below
+                       # will fail.
+                       skip "no tar program available", 1
+                               if (!defined $tar || $tar eq '');
+
+                       # Untar.
+                       mkdir($extract_path);
+                       system_or_bail($tar, 'xf', $backup_path . '/base.tar',
+                               '-C', $extract_path);
+
+                       # Verify.
+                       $primary->command_ok([ 'pg_verifybackup', '-n',
+                               '-m', "$backup_path/backup_manifest", '-e', $extract_path ],
+                               "verify backup, compression $method");
+               }
+
+               # Cleanup.
+               rmtree($extract_path);
+               rmtree($backup_path);
+       }
+}
index a310bcb28c90202317d715932a4828a6cb619ab6..bab81bd459a981ee62b09962adb7f29b39588073 100644 (file)
@@ -379,6 +379,7 @@ sub mkvcbuild
        $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_file.c');
        $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c');
        $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c');
+       $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c');
        $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c');
        $pgbasebackup->AddLibrary('ws2_32.lib');