pg_basebackup: Allow client-side LZ4 (de)compression.
authorRobert Haas <rhaas@postgresql.org>
Fri, 11 Feb 2022 14:41:42 +0000 (09:41 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 11 Feb 2022 14:41:42 +0000 (09:41 -0500)
LZ4 compression can now be performed on the client using
pg_basebackup -Ft --compress client-lz4, and LZ4 decompression of
a backup compressed on the server can be performed on the client
using pg_basebackup -Fp --compress server-lz4.

Dipesh Pandit, reviewed and tested by Jeevan Ladhe and Tushar Ahuja,
with a few corrections - and some documentation - by me.

Discussion: http://postgr.es/m/CAN1g5_FeDmiA9D8wdG2W6Lkq5CpubxOAqTmd2et9hsinTJtsMQ@mail.gmail.com

doc/src/sgml/ref/pg_basebackup.sgml
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/bbstreamer.h
src/bin/pg_basebackup/bbstreamer_lz4.c [new file with mode: 0644]
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_verifybackup/t/009_extract.pl
src/bin/pg_verifybackup/t/010_client_untar.pl [new file with mode: 0644]
src/tools/msvc/Mkvcbuild.pm

index 7a1b432eba33bc63ccaf6ceda6c50bf2f261b3dc..53aa40dcd192b72a9e959464c67373cf09cd990b 100644 (file)
@@ -417,18 +417,14 @@ PostgreSQL documentation
         specify <literal>-Xfetch</literal>.
        </para>
        <para>
-        The compression method can be set to <literal>gzip</literal> for
-        compression with <application>gzip</application>, or
-        <literal>lz4</literal> for compression with
-        <application>lz4</application>, or <literal>none</literal> for no
-        compression. However, <literal>lz4</literal> can be currently only
-        used with <literal>server</literal>. A compression level can be
-        optionally specified, by appending the level number after a
-        colon (<literal>:</literal>). If no level is specified, the default
-        compression level will be used. If only a level is specified without
-        mentioning an algorithm, <literal>gzip</literal> compression will
-        be used if the level is greater than 0, and no compression will be
-        used if the level is 0.
+        The compression method can be set to <literal>gzip</literal> or
+        <literal>lz4</literal>, or <literal>none</literal> for no
+        compression. A compression level can be optionally specified, by
+        appending the level number after a colon (<literal>:</literal>). If no
+        level is specified, the default compression level will be used. If
+        only a level is specified without mentioning an algorithm,
+        <literal>gzip</literal> compression will be used if the level is
+        greater than 0, and no compression will be used if the level is 0.
        </para>
        <para>
         When the tar format is used with <literal>gzip</literal> or
@@ -439,6 +435,13 @@ PostgreSQL documentation
         compression. If this is done, the server will compress the backup for
         transmission, and the client will decompress and extract it.
        </para>
+       <para>
+        When this option is used in combination with
+        <literal>-Xstream</literal>, <literal>pg_wal.tar</literal> will
+        be compressed using <literal>gzip</literal> if client-side gzip
+        compression is selected, but will not be compressed if server-side
+        compresion or LZ4 compresion is selected.
+       </para>
       </listitem>
      </varlistentry>
     </variablelist>
index ada3a5a5783347b7509c2dc3704daeeb6259b0ac..1d0db4f9d025ea87d42e64abb894f0ffd442dcb9 100644 (file)
@@ -43,6 +43,7 @@ BBOBJS = \
    bbstreamer_file.o \
    bbstreamer_gzip.o \
    bbstreamer_inject.o \
+   bbstreamer_lz4.o \
    bbstreamer_tar.o
 
 all: pg_basebackup pg_receivewal pg_recvlogical
index fe49ae35e50586b76e2015acd12fa40a994b28af..c2de77bacc0409088a3a6740570be81f90cb6fee 100644 (file)
@@ -206,6 +206,9 @@ extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
                                            void (*report_output_file) (const char *));
 
 extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
+                                                int compresslevel);
+extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/bbstreamer_lz4.c
new file mode 100644 (file)
index 0000000..f0bc226
--- /dev/null
@@ -0,0 +1,431 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_lz4.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/bin/pg_basebackup/bbstreamer_lz4.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+#include "common/file_perm.h"
+#include "common/string.h"
+
+#ifdef HAVE_LIBLZ4
+typedef struct bbstreamer_lz4_frame
+{
+   bbstreamer  base;
+
+   LZ4F_compressionContext_t   cctx;
+   LZ4F_decompressionContext_t dctx;
+   LZ4F_preferences_t          prefs;
+
+   size_t      bytes_written;
+   bool        header_written;
+} bbstreamer_lz4_frame;
+
+static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+                                             bbstreamer_member *member,
+                                             const char *data, int len,
+                                             bbstreamer_archive_context context);
+static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
+   .content = bbstreamer_lz4_compressor_content,
+   .finalize = bbstreamer_lz4_compressor_finalize,
+   .free = bbstreamer_lz4_compressor_free
+};
+
+static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+                                               bbstreamer_member *member,
+                                               const char *data, int len,
+                                               bbstreamer_archive_context context);
+static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
+   .content = bbstreamer_lz4_decompressor_content,
+   .finalize = bbstreamer_lz4_decompressor_finalize,
+   .free = bbstreamer_lz4_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs lz4 compression of tar
+ * blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel)
+{
+#ifdef HAVE_LIBLZ4
+   bbstreamer_lz4_frame   *streamer;
+   LZ4F_errorCode_t        ctxError;
+   LZ4F_preferences_t     *prefs;
+   size_t                  compressed_bound;
+
+   Assert(next != NULL);
+
+   streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+   *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+       &bbstreamer_lz4_compressor_ops;
+
+   streamer->base.bbs_next = next;
+   initStringInfo(&streamer->base.bbs_buffer);
+   streamer->header_written = false;
+
+   /* Initialize stream compression preferences */
+   prefs = &streamer->prefs;
+   memset(prefs, 0, sizeof(LZ4F_preferences_t));
+   prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+   prefs->compressionLevel = compresslevel;
+
+   /*
+    * Find out the compression bound, it specifies the minimum destination
+    * capacity required in worst case for the success of compression operation
+    * (LZ4F_compressUpdate) based on a given source size and preferences.
+    */
+   compressed_bound = LZ4F_compressBound(streamer->base.bbs_buffer.maxlen, prefs);
+
+   /* Enlarge buffer if it falls short of compression bound. */
+   if (streamer->base.bbs_buffer.maxlen <= compressed_bound)
+       enlargeStringInfo(&streamer->base.bbs_buffer, compressed_bound);
+
+   ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
+   if (LZ4F_isError(ctxError))
+           pg_log_error("could not create lz4 compression context: %s",
+                        LZ4F_getErrorName(ctxError));
+
+   return &streamer->base;
+#else
+   pg_log_error("this build does not support compression");
+   exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+bbstreamer_lz4_compressor_content(bbstreamer *streamer,
+                                 bbstreamer_member *member,
+                                 const char *data, int len,
+                                 bbstreamer_archive_context context)
+{
+   bbstreamer_lz4_frame   *mystreamer;
+   uint8                  *next_in,
+                          *next_out;
+   size_t                  out_bound,
+                           compressed_size,
+                           avail_out;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+   next_in = (uint8 *) data;
+
+   /* Write header before processing the first input chunk. */
+   if (!mystreamer->header_written)
+   {
+       compressed_size = LZ4F_compressBegin(mystreamer->cctx,
+                                            (uint8 *) mystreamer->base.bbs_buffer.data,
+                                            mystreamer->base.bbs_buffer.maxlen,
+                                            &mystreamer->prefs);
+
+       if (LZ4F_isError(compressed_size))
+           pg_log_error("could not write lz4 header: %s",
+                        LZ4F_getErrorName(compressed_size));
+
+       mystreamer->bytes_written += compressed_size;
+       mystreamer->header_written = true;
+   }
+
+   /*
+    * Update the offset and capacity of output buffer based on number of bytes
+    * written to output buffer.
+    */
+   next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+   avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+   /*
+    * Find out the compression bound and make sure that output buffer has the
+    * required capacity for the success of LZ4F_compressUpdate. If needed
+    * forward the content to next streamer and empty the buffer.
+    */
+   out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
+   Assert(mystreamer->base.bbs_buffer.maxlen >= out_bound);
+   if (avail_out <= out_bound)
+   {
+           bbstreamer_content(mystreamer->base.bbs_next, member,
+                              mystreamer->base.bbs_buffer.data,
+                              mystreamer->bytes_written,
+                              context);
+
+           avail_out = mystreamer->base.bbs_buffer.maxlen;
+           mystreamer->bytes_written = 0;
+           next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+   }
+
+   /*
+    * This call compresses the data starting at next_in and generates the
+    * output starting at next_out. It expects the caller to provide the size
+    * of input buffer and capacity of output buffer by providing parameters
+    * len and avail_out.
+    *
+    * It returns the number of bytes compressed to output buffer.
+    */
+   compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
+                                         next_out, avail_out,
+                                         next_in, len, NULL);
+
+   if (LZ4F_isError(compressed_size))
+       pg_log_error("could not compress data: %s",
+                    LZ4F_getErrorName(compressed_size));
+
+   mystreamer->bytes_written += compressed_size;
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
+{
+   bbstreamer_lz4_frame   *mystreamer;
+   uint8                  *next_out;
+   size_t                  footer_bound,
+                           compressed_size,
+                           avail_out;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+   /* Find out the footer bound and update the output buffer. */
+   footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
+   Assert(mystreamer->base.bbs_buffer.maxlen >= footer_bound);
+   if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <=
+       footer_bound)
+   {
+           bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                              mystreamer->base.bbs_buffer.data,
+                              mystreamer->bytes_written,
+                              BBSTREAMER_UNKNOWN);
+
+           avail_out = mystreamer->base.bbs_buffer.maxlen;
+           mystreamer->bytes_written = 0;
+           next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+   }
+   else
+   {
+       next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+       avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+   }
+
+   /*
+    * Finalize the frame and flush whatever data remaining in compression
+    * context.
+    */
+   compressed_size = LZ4F_compressEnd(mystreamer->cctx,
+                                      next_out, avail_out, NULL);
+
+   if (LZ4F_isError(compressed_size))
+       pg_log_error("could not end lz4 compression: %s",
+                    LZ4F_getErrorName(compressed_size));
+
+   mystreamer->bytes_written += compressed_size;
+
+   bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                      mystreamer->base.bbs_buffer.data,
+                      mystreamer->bytes_written,
+                      BBSTREAMER_UNKNOWN);
+
+   bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_compressor_free(bbstreamer *streamer)
+{
+   bbstreamer_lz4_frame    *mystreamer;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+   bbstreamer_free(streamer->bbs_next);
+   LZ4F_freeCompressionContext(mystreamer->cctx);
+   pfree(streamer->bbs_buffer.data);
+   pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of lz4
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_lz4_decompressor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBLZ4
+   bbstreamer_lz4_frame    *streamer;
+   LZ4F_errorCode_t        ctxError;
+
+   Assert(next != NULL);
+
+   streamer = palloc0(sizeof(bbstreamer_lz4_frame));
+   *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+       &bbstreamer_lz4_decompressor_ops;
+
+   streamer->base.bbs_next = next;
+   initStringInfo(&streamer->base.bbs_buffer);
+
+   /* Initialize internal stream state for decompression */
+   ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
+   if (LZ4F_isError(ctxError))
+   {
+       pg_log_error("could not initialize compression library: %s",
+               LZ4F_getErrorName(ctxError));
+       exit(1);
+   }
+
+   return &streamer->base;
+#else
+   pg_log_error("this build does not support compression");
+   exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBLZ4
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
+                                   bbstreamer_member *member,
+                                   const char *data, int len,
+                                   bbstreamer_archive_context context)
+{
+   bbstreamer_lz4_frame   *mystreamer;
+   uint8                  *next_in,
+                          *next_out;
+   size_t                  avail_in,
+                           avail_out;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+   next_in = (uint8 *) data;
+   next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+   avail_in = len;
+   avail_out = mystreamer->base.bbs_buffer.maxlen;
+
+   while (avail_in > 0)
+   {
+       size_t  ret,
+               read_size,
+               out_size;
+
+       read_size = avail_in;
+       out_size = avail_out;
+
+       /*
+        * This call decompresses the data starting at next_in and generates
+        * the output data starting at next_out. It expects the caller to
+        * provide size of the input buffer and total capacity of the output
+        * buffer by providing the read_size and out_size parameters
+        * respectively.
+        *
+        * Per the documentation of LZ4, parameters read_size and out_size
+        * behaves as dual parameters. On return, the number of bytes consumed
+        * from the input buffer will be written back to read_size and the
+        * number of bytes decompressed to output buffer will be written back
+        * to out_size respectively.
+        */
+       ret = LZ4F_decompress(mystreamer->dctx,
+                             next_out, &out_size,
+                             next_in, &read_size, NULL);
+
+       if (LZ4F_isError(ret))
+           pg_log_error("could not decompress data: %s",
+                        LZ4F_getErrorName(ret));
+
+       /* Update input buffer based on number of bytes consumed */
+       avail_in -= read_size;
+       next_in += read_size;
+
+       mystreamer->bytes_written += out_size;
+
+       /*
+        * If output buffer is full then forward the content to next streamer and
+        * update the output buffer.
+        */
+       if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+       {
+           bbstreamer_content(mystreamer->base.bbs_next, member,
+                              mystreamer->base.bbs_buffer.data,
+                              mystreamer->base.bbs_buffer.maxlen,
+                              context);
+
+           avail_out = mystreamer->base.bbs_buffer.maxlen;
+           mystreamer->bytes_written = 0;
+           next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+       }
+       else
+       {
+           avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+           next_out += mystreamer->bytes_written;
+       }
+   }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
+{
+   bbstreamer_lz4_frame    *mystreamer;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+
+   /*
+    * End of the stream, if there is some pending data in output buffers then
+    * we must forward it to next streamer.
+    */
+   bbstreamer_content(mystreamer->base.bbs_next, NULL,
+                      mystreamer->base.bbs_buffer.data,
+                      mystreamer->base.bbs_buffer.maxlen,
+                      BBSTREAMER_UNKNOWN);
+
+   bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
+{
+   bbstreamer_lz4_frame    *mystreamer;
+
+   mystreamer = (bbstreamer_lz4_frame *) streamer;
+   bbstreamer_free(streamer->bbs_next);
+   LZ4F_freeDecompressionContext(mystreamer->dctx);
+   pfree(streamer->bbs_buffer.data);
+   pfree(streamer);
+}
+#endif
index 923659ddee5f5dc6fdc69136df85e85397ab2fc7..0003b59615753a88309330d48bb2f27f669b2bd6 100644 (file)
@@ -560,11 +560,16 @@ LogStreamerMain(logstreamer_param *param)
                                              COMPRESSION_NONE,
                                              compresslevel,
                                              stream.do_sync);
-   else
+   else if (compressmethod == COMPRESSION_GZIP)
        stream.walmethod = CreateWalTarMethod(param->xlog,
                                              compressmethod,
                                              compresslevel,
                                              stream.do_sync);
+   else
+       stream.walmethod = CreateWalTarMethod(param->xlog,
+                                             COMPRESSION_NONE,
+                                             compresslevel,
+                                             stream.do_sync);
 
    if (!ReceiveXlogStream(param->bgconn, &stream))
 
@@ -1003,6 +1008,16 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
        *methodres = COMPRESSION_GZIP;
        *locationres = COMPRESS_LOCATION_SERVER;
    }
+   else if (pg_strcasecmp(firstpart, "lz4") == 0)
+   {
+       *methodres = COMPRESSION_LZ4;
+       *locationres = COMPRESS_LOCATION_UNSPECIFIED;
+   }
+   else if (pg_strcasecmp(firstpart, "client-lz4") == 0)
+   {
+       *methodres = COMPRESSION_LZ4;
+       *locationres = COMPRESS_LOCATION_CLIENT;
+   }
    else if (pg_strcasecmp(firstpart, "server-lz4") == 0)
    {
        *methodres = COMPRESSION_LZ4;
@@ -1125,7 +1140,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
    bbstreamer *manifest_inject_streamer = NULL;
    bool        inject_manifest;
    bool        is_tar,
-               is_tar_gz;
+               is_tar_gz,
+               is_tar_lz4;
    bool        must_parse_archive;
    int         archive_name_len = strlen(archive_name);
 
@@ -1144,6 +1160,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
    is_tar_gz = (archive_name_len > 8 &&
                 strcmp(archive_name + archive_name_len - 3, ".gz") == 0);
 
+   /* Is this a LZ4 archive? */
+   is_tar_lz4 = (archive_name_len > 8 &&
+                 strcmp(archive_name + archive_name_len - 4, ".lz4") == 0);
+
    /*
     * We have to parse the archive if (1) we're suppose to extract it, or if
     * (2) we need to inject backup_manifest or recovery configuration into it.
@@ -1153,7 +1173,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
        (spclocation == NULL && writerecoveryconf));
 
    /* At present, we only know how to parse tar archives. */
-   if (must_parse_archive && !is_tar && !is_tar_gz)
+   if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4)
    {
        pg_log_error("unable to parse archive: %s", archive_name);
        pg_log_info("only tar archives can be parsed");
@@ -1217,6 +1237,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
                                                  archive_file,
                                                  compresslevel);
        }
+       else if (compressmethod == COMPRESSION_LZ4)
+       {
+           strlcat(archive_filename, ".lz4", sizeof(archive_filename));
+           streamer = bbstreamer_plain_writer_new(archive_filename,
+                                                  archive_file);
+           streamer = bbstreamer_lz4_compressor_new(streamer,
+                                                    compresslevel);
+       }
        else
        {
            Assert(false);      /* not reachable */
@@ -1269,9 +1297,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
     * If the user has requested a server compressed archive along with archive
     * extraction at client then we need to decompress it.
     */
-   if (format == 'p' && compressmethod == COMPRESSION_GZIP &&
-           compressloc == COMPRESS_LOCATION_SERVER)
-       streamer = bbstreamer_gzip_decompressor_new(streamer);
+   if (format == 'p' && compressloc == COMPRESS_LOCATION_SERVER)
+   {
+       if (compressmethod == COMPRESSION_GZIP)
+           streamer = bbstreamer_gzip_decompressor_new(streamer);
+       else if (compressmethod == COMPRESSION_LZ4)
+           streamer = bbstreamer_lz4_decompressor_new(streamer);
+   }
 
    /* Return the results. */
    *manifest_inject_streamer_p = manifest_inject_streamer;
index 51b77e4bfe64f82e00a806ff9417ff5b9c9d185a..9f9a7cc6a5fac5d38fc59754f2c0321382d1cb2c 100644 (file)
@@ -11,7 +11,7 @@ use Config;
 use File::Path qw(rmtree);
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 4;
+use Test::More tests => 6;
 
 my $primary = PostgreSQL::Test::Cluster->new('primary');
 $primary->init(allows_streaming => 1);
@@ -27,6 +27,11 @@ my @test_configuration = (
        'compression_method' => 'gzip',
        'backup_flags' => ['--compress', 'server-gzip:5'],
        'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+   },
+   {
+       'compression_method' => 'lz4',
+       'backup_flags' => ['--compress', 'server-lz4:5'],
+       'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
    }
 );
 
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
new file mode 100644 (file)
index 0000000..34c9b90
--- /dev/null
@@ -0,0 +1,111 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# This test case aims to verify that client-side backup compression work
+# properly, and it also aims to verify that pg_verifybackup can verify a base
+# backup that didn't start out in plain format.
+
+use strict;
+use warnings;
+use Config;
+use File::Path qw(rmtree);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+my $backup_path = $primary->backup_dir . '/client-backup';
+my $extract_path = $primary->backup_dir . '/extracted-backup';
+
+my @test_configuration = (
+   {
+       'compression_method' => 'none',
+       'backup_flags' => [],
+       'backup_archive' => 'base.tar',
+       'enabled' => 1
+   },
+   {
+       'compression_method' => 'gzip',
+       'backup_flags' => ['--compress', 'client-gzip:5'],
+       'backup_archive' => 'base.tar.gz',
+       'decompress_program' => $ENV{'GZIP_PROGRAM'},
+       'decompress_flags' => [ '-d' ],
+       'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+   },
+   {
+       'compression_method' => 'lz4',
+       'backup_flags' => ['--compress', 'client-lz4:5'],
+       'backup_archive' => 'base.tar.lz4',
+       'decompress_program' => $ENV{'LZ4'},
+       'decompress_flags' => [ '-d' ],
+       'output_file' => 'base.tar',
+       'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
+   }
+);
+
+for my $tc (@test_configuration)
+{
+   my $method = $tc->{'compression_method'};
+
+   SKIP: {
+       skip "$method compression not supported by this build", 3
+           if ! $tc->{'enabled'};
+       skip "no decompressor available for $method", 3
+           if exists $tc->{'decompress_program'} &&
+           !defined $tc->{'decompress_program'};
+
+       # Take a client-side backup.
+       my @backup      = (
+           'pg_basebackup', '-D', $backup_path,
+           '-Xfetch', '--no-sync', '-cfast', '-Ft');
+       push @backup, @{$tc->{'backup_flags'}};
+       $primary->command_ok(\@backup,
+                            "client side backup, compression $method");
+
+
+       # Verify that the we got the files we expected.
+       my $backup_files = join(',',
+           sort grep { $_ ne '.' && $_ ne '..' } slurp_dir($backup_path));
+       my $expected_backup_files = join(',',
+           sort ('backup_manifest', $tc->{'backup_archive'}));
+       is($backup_files,$expected_backup_files,
+           "found expected backup files, compression $method");
+
+       # Decompress.
+       if (exists $tc->{'decompress_program'})
+       {
+           my @decompress = ($tc->{'decompress_program'});
+           push @decompress, @{$tc->{'decompress_flags'}}
+               if $tc->{'decompress_flags'};
+           push @decompress, $backup_path . '/' . $tc->{'backup_archive'};
+           push @decompress, $backup_path . '/' . $tc->{'output_file'}
+               if $tc->{'output_file'};
+           system_or_bail(@decompress);
+       }
+
+       SKIP: {
+           my $tar = $ENV{TAR};
+           # don't check for a working tar here, to accomodate various odd
+           # cases such as AIX. If tar doesn't work the init_from_backup below
+           # will fail.
+           skip "no tar program available", 1
+               if (!defined $tar || $tar eq '');
+
+           # Untar.
+           mkdir($extract_path);
+           system_or_bail($tar, 'xf', $backup_path . '/base.tar',
+               '-C', $extract_path);
+
+           # Verify.
+           $primary->command_ok([ 'pg_verifybackup', '-n',
+               '-m', "$backup_path/backup_manifest", '-e', $extract_path ],
+               "verify backup, compression $method");
+       }
+
+       # Cleanup.
+       rmtree($extract_path);
+       rmtree($backup_path);
+   }
+}
index a310bcb28c90202317d715932a4828a6cb619ab6..bab81bd459a981ee62b09962adb7f29b39588073 100644 (file)
@@ -379,6 +379,7 @@ sub mkvcbuild
    $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_file.c');
    $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c');
    $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c');
+   $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c');
    $pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c');
    $pgbasebackup->AddLibrary('ws2_32.lib');