Add LZ4 compression to pg_dump
authorTomas Vondra <tomas.vondra@postgresql.org>
Thu, 23 Feb 2023 20:19:19 +0000 (21:19 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Thu, 23 Feb 2023 20:19:26 +0000 (21:19 +0100)
Expand pg_dump's compression streaming and file APIs to support the lz4
algorithm. The newly added compress_lz4.{c,h} files cover all the
functionality of the aforementioned APIs. Minor changes were necessary
in various pg_backup_* files, where code for the 'lz4' file suffix has
been added, as well as pg_dump's compression option parsing.

Author: Georgios Kokolatos
Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Shi Yu, Tomas Vondra
Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com

12 files changed:
doc/src/sgml/ref/pg_dump.sgml
src/bin/pg_dump/Makefile
src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_lz4.c [new file with mode: 0644]
src/bin/pg_dump/compress_lz4.h [new file with mode: 0644]
src/bin/pg_dump/meson.build
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_directory.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/t/002_pg_dump.pl
src/tools/pginclude/cpluspluscheck
src/tools/pgindent/typedefs.list

index 2c938cd7e149443afee237379434d1d21af77304..49d218905fbeb9eb55efc553e74309f6c2ba87aa 100644 (file)
@@ -330,9 +330,10 @@ PostgreSQL documentation
            machine-readable format that <application>pg_restore</application>
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
-           can be compressed with the <application>gzip</application> tool.
-           This format is compressed by default and also supports parallel
-           dumps.
+           can be compressed with the <application>gzip</application> or
+           <application>lz4</application>tool.
+           This format is compressed by default using <literal>gzip</literal>
+           and also supports parallel dumps.
           </para>
          </listitem>
         </varlistentry>
@@ -654,7 +655,7 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal> or
-        <literal>none</literal> for no compression.
+        <literal>lz4</literal> or <literal>none</literal> for no compression.
         A compression detail string can optionally be specified.  If the
         detail string is an integer, it specifies the compression level.
         Otherwise, it should be a comma-separated list of items, each of the
@@ -675,8 +676,8 @@ PostgreSQL documentation
         individual table-data segments, and the default is to compress using
         <literal>gzip</literal> at a moderate level. For plain text output,
         setting a nonzero compression level causes the entire output file to be compressed,
-        as though it had been fed through <application>gzip</application>; but the default
-        is not to compress.
+        as though it had been fed through <application>gzip</application> or
+        <application>lz4</application>; but the default is not to compress.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
index 0013bc080cf0e89047d5cf7ef446d244535abd32..eb8f59459a13c9d6456b10ec157fbaa5a2b28c72 100644 (file)
@@ -17,6 +17,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 export GZIP_PROGRAM=$(GZIP)
+export LZ4
 export with_icu
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@@ -26,6 +27,7 @@ OBJS = \
    $(WIN32RES) \
    compress_gzip.o \
    compress_io.o \
+   compress_lz4.o \
    compress_none.o \
    dumputils.o \
    parallel.o \
index c2eb3dbb4a4c6425e097fc4e43a0fc74e0633dd8..ce06f1eac9cd8f11fa078d1f4cd859d29e46240f 100644 (file)
@@ -53,7 +53,7 @@
  * InitDiscoverCompressFileHandle tries to infer the compression by the
  * filename suffix. If the suffix is not yet known then it tries to simply
  * open the file and if it fails, it tries to open the same file with the .gz
- * suffix.
+ * suffix, and then again with the .lz4 suffix.
  *
  * IDENTIFICATION
  *    src/bin/pg_dump/compress_io.c
@@ -67,6 +67,7 @@
 
 #include "compress_gzip.h"
 #include "compress_io.h"
+#include "compress_lz4.h"
 #include "compress_none.h"
 #include "pg_backup_utils.h"
 
@@ -93,6 +94,10 @@ supports_compression(const pg_compress_specification compression_spec)
    if (algorithm == PG_COMPRESSION_GZIP)
        supported = true;
 #endif
+#ifdef USE_LZ4
+   if (algorithm == PG_COMPRESSION_LZ4)
+       supported = true;
+#endif
 
    if (!supported)
        return psprintf("this build does not support compression with %s",
@@ -123,6 +128,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
        InitCompressorNone(cs, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
        InitCompressorGzip(cs, compression_spec);
+   else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
+       InitCompressorLZ4(cs, compression_spec);
 
    return cs;
 }
@@ -187,6 +194,8 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
        InitCompressFileHandleNone(CFH, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
        InitCompressFileHandleGzip(CFH, compression_spec);
+   else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
+       InitCompressFileHandleLZ4(CFH, compression_spec);
 
    return CFH;
 }
@@ -196,11 +205,11 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
  * be either "r" or "rb".
  *
  * If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes only ".gz", then this compression will be used
+ * currently this includes ".gz" and ".lz4", then this compression will be used
  * throughout. Otherwise the compression will be inferred by iteratively trying
  * to open the file at 'path', first as is, then by appending known compression
  * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz", trying in that order.
+ * "foo.gz" or "foo.lz4", trying in that order.
  *
  * On failure, return NULL with an error code in errno.
  */
@@ -238,6 +247,17 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
            if (exists)
                compression_spec.algorithm = PG_COMPRESSION_GZIP;
        }
+#endif
+#ifdef USE_LZ4
+       if (!exists)
+       {
+           free_keep_errno(fname);
+           fname = psprintf("%s.lz4", path);
+           exists = (stat(fname, &st) == 0);
+
+           if (exists)
+               compression_spec.algorithm = PG_COMPRESSION_LZ4;
+       }
 #endif
    }
 
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
new file mode 100644 (file)
index 0000000..fe1014e
--- /dev/null
@@ -0,0 +1,626 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_lz4.c
+ *  Routines for archivers to write a LZ4 compressed data stream.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/bin/pg_dump/compress_lz4.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+#include "pg_backup_utils.h"
+
+#include "compress_lz4.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#include <lz4frame.h>
+
+#define LZ4_OUT_SIZE   (4 * 1024)
+#define LZ4_IN_SIZE        (16 * 1024)
+
+/*
+ * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
+ * Redefine it for installations with a lesser version.
+ */
+#ifndef LZ4F_HEADER_SIZE_MAX
+#define LZ4F_HEADER_SIZE_MAX   32
+#endif
+
+/*----------------------
+ * Compressor API
+ *----------------------
+ */
+
+typedef struct LZ4CompressorState
+{
+   char       *outbuf;
+   size_t      outsize;
+} LZ4CompressorState;
+
+/* Private routines that support LZ4 compressed data I/O */
+static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+                                 const void *data, size_t dLen);
+static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+   LZ4_streamDecode_t lz4StreamDecode;
+   char       *buf;
+   char       *decbuf;
+   size_t      buflen;
+   size_t      cnt;
+
+   buflen = LZ4_IN_SIZE;
+   buf = pg_malloc(buflen);
+   decbuf = pg_malloc(buflen);
+
+   LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+
+   while ((cnt = cs->readF(AH, &buf, &buflen)))
+   {
+       int         decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
+                                                           buf, decbuf,
+                                                           cnt, buflen);
+
+       ahwrite(decbuf, 1, decBytes, AH);
+   }
+
+   pg_free(buf);
+   pg_free(decbuf);
+}
+
+static void
+WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
+                     const void *data, size_t dLen)
+{
+   LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
+   size_t      compressed;
+   size_t      requiredsize = LZ4_compressBound(dLen);
+
+   if (requiredsize > LZ4cs->outsize)
+   {
+       LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
+       LZ4cs->outsize = requiredsize;
+   }
+
+   compressed = LZ4_compress_default(data, LZ4cs->outbuf,
+                                     dLen, LZ4cs->outsize);
+
+   if (compressed <= 0)
+       pg_fatal("failed to LZ4 compress data");
+
+   cs->writeF(AH, LZ4cs->outbuf, compressed);
+}
+
+static void
+EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
+{
+   LZ4CompressorState *LZ4cs;
+
+   LZ4cs = (LZ4CompressorState *) cs->private_data;
+   if (LZ4cs)
+   {
+       pg_free(LZ4cs->outbuf);
+       pg_free(LZ4cs);
+       cs->private_data = NULL;
+   }
+}
+
+
+/*
+ * Public routines that support LZ4 compressed data I/O
+ */
+void
+InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+   cs->readData = ReadDataFromArchiveLZ4;
+   cs->writeData = WriteDataToArchiveLZ4;
+   cs->end = EndCompressorLZ4;
+
+   cs->compression_spec = compression_spec;
+
+   /* Will be lazy init'd */
+   cs->private_data = pg_malloc0(sizeof(LZ4CompressorState));
+}
+
+/*----------------------
+ * Compress File API
+ *----------------------
+ */
+
+/*
+ * State needed for LZ4 (de)compression using the CompressFileHandle API.
+ */
+typedef struct LZ4File
+{
+   FILE       *fp;
+
+   LZ4F_preferences_t prefs;
+
+   LZ4F_compressionContext_t ctx;
+   LZ4F_decompressionContext_t dtx;
+
+   bool        inited;
+   bool        compressing;
+
+   size_t      buflen;
+   char       *buffer;
+
+   size_t      overflowalloclen;
+   size_t      overflowlen;
+   char       *overflowbuf;
+
+   size_t      errcode;
+} LZ4File;
+
+/*
+ * LZ4 equivalent to feof() or gzeof(). The end of file is reached if there
+ * is no decompressed output in the overflow buffer and the end of the file
+ * is reached.
+ */
+static int
+LZ4File_eof(CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+
+   return fs->overflowlen == 0 && feof(fs->fp);
+}
+
+static const char *
+LZ4File_get_error(CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   const char *errmsg;
+
+   if (LZ4F_isError(fs->errcode))
+       errmsg = LZ4F_getErrorName(fs->errcode);
+   else
+       errmsg = strerror(errno);
+
+   return errmsg;
+}
+
+/*
+ * Prepare an already alloc'ed LZ4File struct for subsequent calls.
+ *
+ * It creates the necessary contexts for the operations. When compressing,
+ * it additionally writes the LZ4 header in the output stream.
+ */
+static int
+LZ4File_init(LZ4File *fs, int size, bool compressing)
+{
+   size_t      status;
+
+   if (fs->inited)
+       return 0;
+
+   fs->compressing = compressing;
+   fs->inited = true;
+
+   if (fs->compressing)
+   {
+       fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs);
+       if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
+           fs->buflen = LZ4F_HEADER_SIZE_MAX;
+
+       status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
+       if (LZ4F_isError(status))
+       {
+           fs->errcode = status;
+           return 1;
+       }
+
+       fs->buffer = pg_malloc(fs->buflen);
+       status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
+                                   &fs->prefs);
+
+       if (LZ4F_isError(status))
+       {
+           fs->errcode = status;
+           return 1;
+       }
+
+       if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+       {
+           errno = (errno) ? errno : ENOSPC;
+           return 1;
+       }
+   }
+   else
+   {
+       status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
+       if (LZ4F_isError(status))
+       {
+           fs->errcode = status;
+           return 1;
+       }
+
+       fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE;
+       fs->buffer = pg_malloc(fs->buflen);
+
+       fs->overflowalloclen = fs->buflen;
+       fs->overflowbuf = pg_malloc(fs->overflowalloclen);
+       fs->overflowlen = 0;
+   }
+
+   return 0;
+}
+
+/*
+ * Read already decompressed content from the overflow buffer into 'ptr' up to
+ * 'size' bytes, if available. If the eol_flag is set, then stop at the first
+ * occurrence of the new line char prior to 'size' bytes.
+ *
+ * Any unread content in the overflow buffer is moved to the beginning.
+ */
+static int
+LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
+{
+   char       *p;
+   int         readlen = 0;
+
+   if (fs->overflowlen == 0)
+       return 0;
+
+   if (fs->overflowlen >= size)
+       readlen = size;
+   else
+       readlen = fs->overflowlen;
+
+   if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
+       /* Include the line terminating char */
+       readlen = p - fs->overflowbuf + 1;
+
+   memcpy(ptr, fs->overflowbuf, readlen);
+   fs->overflowlen -= readlen;
+
+   if (fs->overflowlen > 0)
+       memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
+
+   return readlen;
+}
+
+/*
+ * The workhorse for reading decompressed content out of an LZ4 compressed
+ * stream.
+ *
+ * It will read up to 'ptrsize' decompressed content, or up to the new line
+ * char if found first when the eol_flag is set. It is possible that the
+ * decompressed output generated by reading any compressed input via the
+ * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
+ * at an overflow buffer within LZ4File. Of course, when the function is
+ * called, it will first try to consume any decompressed content already
+ * present in the overflow buffer, before decompressing new content.
+ */
+static int
+LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
+{
+   size_t      dsize = 0;
+   size_t      rsize;
+   size_t      size = ptrsize;
+   bool        eol_found = false;
+
+   void       *readbuf;
+
+   /* Lazy init */
+   if (LZ4File_init(fs, size, false /* decompressing */ ))
+       return -1;
+
+   /* Verify that there is enough space in the outbuf */
+   if (size > fs->buflen)
+   {
+       fs->buflen = size;
+       fs->buffer = pg_realloc(fs->buffer, size);
+   }
+
+   /* use already decompressed content if available */
+   dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
+   if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
+       return dsize;
+
+   readbuf = pg_malloc(size);
+
+   do
+   {
+       char       *rp;
+       char       *rend;
+
+       rsize = fread(readbuf, 1, size, fs->fp);
+       if (rsize < size && !feof(fs->fp))
+           return -1;
+
+       rp = (char *) readbuf;
+       rend = (char *) readbuf + rsize;
+
+       while (rp < rend)
+       {
+           size_t      status;
+           size_t      outlen = fs->buflen;
+           size_t      read_remain = rend - rp;
+
+           memset(fs->buffer, 0, outlen);
+           status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
+                                    rp, &read_remain, NULL);
+           if (LZ4F_isError(status))
+           {
+               fs->errcode = status;
+               return -1;
+           }
+
+           rp += read_remain;
+
+           /*
+            * fill in what space is available in ptr if the eol flag is set,
+            * either skip if one already found or fill up to EOL if present
+            * in the outbuf
+            */
+           if (outlen > 0 && dsize < size && eol_found == false)
+           {
+               char       *p;
+               size_t      lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
+               size_t      len = outlen < lib ? outlen : lib;
+
+               if (eol_flag &&
+                   (p = memchr(fs->buffer, '\n', outlen)) &&
+                   (size_t) (p - fs->buffer + 1) <= len)
+               {
+                   len = p - fs->buffer + 1;
+                   eol_found = true;
+               }
+
+               memcpy((char *) ptr + dsize, fs->buffer, len);
+               dsize += len;
+
+               /* move what did not fit, if any, at the beginning of the buf */
+               if (len < outlen)
+                   memmove(fs->buffer, fs->buffer + len, outlen - len);
+               outlen -= len;
+           }
+
+           /* if there is available output, save it */
+           if (outlen > 0)
+           {
+               while (fs->overflowlen + outlen > fs->overflowalloclen)
+               {
+                   fs->overflowalloclen *= 2;
+                   fs->overflowbuf = pg_realloc(fs->overflowbuf,
+                                                fs->overflowalloclen);
+               }
+
+               memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
+               fs->overflowlen += outlen;
+           }
+       }
+   } while (rsize == size && dsize < size && eol_found == 0);
+
+   pg_free(readbuf);
+
+   return (int) dsize;
+}
+
+/*
+ * Compress size bytes from ptr and write them to the stream.
+ */
+static size_t
+LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   size_t      status;
+   int         remaining = size;
+
+   /* Lazy init */
+   if (LZ4File_init(fs, size, true))
+       return -1;
+
+   while (remaining > 0)
+   {
+       int         chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE;
+
+       remaining -= chunk;
+
+       status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
+                                    ptr, chunk, NULL);
+       if (LZ4F_isError(status))
+       {
+           fs->errcode = status;
+           return -1;
+       }
+
+       if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+       {
+           errno = (errno) ? errno : ENOSPC;
+           return 1;
+       }
+   }
+
+   return size;
+}
+
+/*
+ * fread() equivalent implementation for LZ4 compressed files.
+ */
+static size_t
+LZ4File_read(void *ptr, size_t size, CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   int         ret;
+
+   ret = LZ4File_read_internal(fs, ptr, size, false);
+   if (ret != size && !LZ4File_eof(CFH))
+       pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+   return ret;
+}
+
+/*
+ * fgetc() equivalent implementation for LZ4 compressed files.
+ */
+static int
+LZ4File_getc(CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   unsigned char c;
+
+   if (LZ4File_read_internal(fs, &c, 1, false) != 1)
+   {
+       if (!LZ4File_eof(CFH))
+           pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+       else
+           pg_fatal("could not read from input file: end of file");
+   }
+
+   return c;
+}
+
+/*
+ * fgets() equivalent implementation for LZ4 compressed files.
+ */
+static char *
+LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
+{
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   size_t      dsize;
+
+   dsize = LZ4File_read_internal(fs, ptr, size, true);
+   if (dsize < 0)
+       pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+
+   /* Done reading */
+   if (dsize == 0)
+       return NULL;
+
+   return ptr;
+}
+
+/*
+ * Finalize (de)compression of a stream. When compressing it will write any
+ * remaining content and/or generated footer from the LZ4 API.
+ */
+static int
+LZ4File_close(CompressFileHandle *CFH)
+{
+   FILE       *fp;
+   LZ4File    *fs = (LZ4File *) CFH->private_data;
+   size_t      status;
+   int         ret;
+
+   fp = fs->fp;
+   if (fs->inited)
+   {
+       if (fs->compressing)
+       {
+           status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
+           if (LZ4F_isError(status))
+               pg_fatal("failed to end compression: %s",
+                        LZ4F_getErrorName(status));
+           else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status)
+           {
+               errno = (errno) ? errno : ENOSPC;
+               WRITE_ERROR_EXIT;
+           }
+
+           status = LZ4F_freeCompressionContext(fs->ctx);
+           if (LZ4F_isError(status))
+               pg_fatal("failed to end compression: %s",
+                        LZ4F_getErrorName(status));
+       }
+       else
+       {
+           status = LZ4F_freeDecompressionContext(fs->dtx);
+           if (LZ4F_isError(status))
+               pg_fatal("failed to end decompression: %s",
+                        LZ4F_getErrorName(status));
+           pg_free(fs->overflowbuf);
+       }
+
+       pg_free(fs->buffer);
+   }
+
+   pg_free(fs);
+
+   return fclose(fp);
+}
+
+static int
+LZ4File_open(const char *path, int fd, const char *mode,
+            CompressFileHandle *CFH)
+{
+   FILE       *fp;
+   LZ4File    *lz4fp = (LZ4File *) CFH->private_data;
+
+   if (fd >= 0)
+       fp = fdopen(fd, mode);
+   else
+       fp = fopen(path, mode);
+   if (fp == NULL)
+   {
+       lz4fp->errcode = errno;
+       return 1;
+   }
+
+   lz4fp->fp = fp;
+
+   return 0;
+}
+
+static int
+LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+   char       *fname;
+   int         ret;
+
+   fname = psprintf("%s.lz4", path);
+   ret = CFH->open_func(fname, -1, mode, CFH);
+   pg_free(fname);
+
+   return ret;
+}
+
+/*
+ * Public routines
+ */
+void
+InitCompressFileHandleLZ4(CompressFileHandle *CFH,
+                         const pg_compress_specification compression_spec)
+{
+   LZ4File    *lz4fp;
+
+   CFH->open_func = LZ4File_open;
+   CFH->open_write_func = LZ4File_open_write;
+   CFH->read_func = LZ4File_read;
+   CFH->write_func = LZ4File_write;
+   CFH->gets_func = LZ4File_gets;
+   CFH->getc_func = LZ4File_getc;
+   CFH->eof_func = LZ4File_eof;
+   CFH->close_func = LZ4File_close;
+   CFH->get_error_func = LZ4File_get_error;
+
+   CFH->compression_spec = compression_spec;
+   lz4fp = pg_malloc0(sizeof(*lz4fp));
+   if (CFH->compression_spec.level >= 0)
+       lz4fp->prefs.compressionLevel = CFH->compression_spec.level;
+
+   CFH->private_data = lz4fp;
+}
+#else                          /* USE_LZ4 */
+void
+InitCompressorLZ4(CompressorState *cs,
+                 const pg_compress_specification compression_spec)
+{
+   pg_fatal("this build does not support compression with %s", "LZ4");
+}
+
+void
+InitCompressFileHandleLZ4(CompressFileHandle *CFH,
+                         const pg_compress_specification compression_spec)
+{
+   pg_fatal("this build does not support compression with %s", "LZ4");
+}
+#endif                         /* USE_LZ4 */
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
new file mode 100644 (file)
index 0000000..40dbe00
--- /dev/null
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_lz4.h
+ *  LZ4 interface to compress_io.c routines
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/bin/pg_dump/compress_lz4.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _COMPRESS_LZ4_H_
+#define _COMPRESS_LZ4_H_
+
+#include "compress_io.h"
+
+extern void InitCompressorLZ4(CompressorState *cs,
+                             const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
+                                     const pg_compress_specification compression_spec);
+
+#endif                         /* _COMPRESS_LZ4_H_ */
index 84e9f0defa485be9c60a9f11605c9bcc27d8dbd8..0da476a4c34af198a91f069a2347b74584472538 100644 (file)
@@ -3,6 +3,7 @@
 pg_dump_common_sources = files(
   'compress_gzip.c',
   'compress_io.c',
+  'compress_lz4.c',
   'compress_none.c',
   'dumputils.c',
   'parallel.c',
@@ -18,7 +19,7 @@ pg_dump_common_sources = files(
 pg_dump_common = static_library('libpgdump_common',
   pg_dump_common_sources,
   c_pch: pch_postgres_fe_h,
-  dependencies: [frontend_code, libpq, zlib],
+  dependencies: [frontend_code, libpq, lz4, zlib],
   kwargs: internal_lib_args,
 )
 
@@ -86,7 +87,10 @@ tests += {
   'sd': meson.current_source_dir(),
   'bd': meson.current_build_dir(),
   'tap': {
-    'env': {'GZIP_PROGRAM': gzip.path()},
+    'env': {
+      'GZIP_PROGRAM': gzip.path(),
+      'LZ4': program_lz4.found() ? program_lz4.path() : '',
+    },
     'tests': [
       't/001_basic.pl',
       't/002_pg_dump.pl',
index f25f3b6fa8fa3413b6faa448a8dbdf0ebc1e139c..61ebb8fe85db3a3d3ef15e5d4c33bf45b159fbbc 100644 (file)
@@ -2075,7 +2075,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
        /*
         * Check if the specified archive is a directory. If so, check if
-        * there's a "toc.dat" (or "toc.dat.gz") file in it.
+        * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
         */
        if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
        {
@@ -2085,6 +2085,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 #ifdef HAVE_LIBZ
            if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
                return AH->format;
+#endif
+#ifdef USE_LZ4
+           if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
+               return AH->format;
 #endif
            pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
                     AH->fSpec);
index e7efeed855bab2d3f61d014a114cbeb1eda3225e..79fa619ba5f323dfbc85a053dc48281405986717 100644 (file)
@@ -779,10 +779,13 @@ _PrepParallelRestore(ArchiveHandle *AH)
 
        if (stat(fname, &st) == 0)
            te->dataLength = st.st_size;
-       else
+       else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
        {
-           /* It might be compressed */
-           strlcat(fname, ".gz", sizeof(fname));
+           if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
+               strlcat(fname, ".gz", sizeof(fname));
+           else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
+               strlcat(fname, ".lz4", sizeof(fname));
+
            if (stat(fname, &st) == 0)
                te->dataLength = st.st_size;
        }
index 1a06eeaf6aa17ea6cc7458c7c3fd3166497aeb5c..cc424fd3b29030335336460e60ce62800496b136 100644 (file)
@@ -715,13 +715,12 @@ main(int argc, char **argv)
        case PG_COMPRESSION_NONE:
            /* fallthrough */
        case PG_COMPRESSION_GZIP:
+           /* fallthrough */
+       case PG_COMPRESSION_LZ4:
            break;
        case PG_COMPRESSION_ZSTD:
            pg_fatal("compression with %s is not yet supported", "ZSTD");
            break;
-       case PG_COMPRESSION_LZ4:
-           pg_fatal("compression with %s is not yet supported", "LZ4");
-           break;
    }
 
    /*
index 78454928ccad88b2b6340d483c559642ed112359..72b19ee6cde4f9ca9cded9d38fd8b816256123b0 100644 (file)
@@ -139,6 +139,80 @@ my %pgdump_runs = (
            args    => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ],
        },
    },
+
+   # Do not use --no-sync to give test coverage for data sync.
+   compression_lz4_custom => {
+       test_key       => 'compression',
+       compile_option => 'lz4',
+       dump_cmd       => [
+           'pg_dump',      '--format=custom',
+           '--compress=lz4', "--file=$tempdir/compression_lz4_custom.dump",
+           'postgres',
+       ],
+       restore_cmd => [
+           'pg_restore',
+           "--file=$tempdir/compression_lz4_custom.sql",
+           "$tempdir/compression_lz4_custom.dump",
+       ],
+       command_like => {
+           command => [
+               'pg_restore',
+               '-l', "$tempdir/compression_lz4_custom.dump",
+           ],
+           expected => qr/Compression: lz4/,
+           name => 'data content is lz4 compressed'
+       },
+   },
+
+   # Do not use --no-sync to give test coverage for data sync.
+   compression_lz4_dir => {
+       test_key       => 'compression',
+       compile_option => 'lz4',
+       dump_cmd       => [
+           'pg_dump',                              '--jobs=2',
+           '--format=directory',                   '--compress=lz4:1',
+           "--file=$tempdir/compression_lz4_dir", 'postgres',
+       ],
+       # Give coverage for manually compressed blob.toc files during
+       # restore.
+       compress_cmd => {
+           program => $ENV{'LZ4'},
+           args    => [
+               '-z', '-f', '--rm',
+               "$tempdir/compression_lz4_dir/blobs.toc",
+               "$tempdir/compression_lz4_dir/blobs.toc.lz4",
+           ],
+       },
+       # Verify that data files were compressed
+       glob_patterns => [
+           "$tempdir/compression_lz4_dir/toc.dat",
+           "$tempdir/compression_lz4_dir/*.dat.lz4",
+       ],
+       restore_cmd => [
+           'pg_restore', '--jobs=2',
+           "--file=$tempdir/compression_lz4_dir.sql",
+           "$tempdir/compression_lz4_dir",
+       ],
+   },
+
+   compression_lz4_plain => {
+       test_key       => 'compression',
+       compile_option => 'lz4',
+       dump_cmd       => [
+           'pg_dump', '--format=plain', '--compress=lz4',
+           "--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres',
+       ],
+       # Decompress the generated file to run through the tests.
+       compress_cmd => {
+           program => $ENV{'LZ4'},
+           args    => [
+               '-d', '-f',
+               "$tempdir/compression_lz4_plain.sql.lz4",
+               "$tempdir/compression_lz4_plain.sql",
+           ],
+       },
+   },
+
    clean => {
        dump_cmd => [
            'pg_dump',
@@ -4175,11 +4249,11 @@ foreach my $run (sort keys %pgdump_runs)
    my $run_db   = 'postgres';
 
    # Skip command-level tests for gzip if there is no support for it.
-   if (   defined($pgdump_runs{$run}->{compile_option})
-       && $pgdump_runs{$run}->{compile_option} eq 'gzip'
-       && !$supports_gzip)
+   if ($pgdump_runs{$run}->{compile_option} &&
+       ($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
+       ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
    {
-       note "$run: skipped due to no gzip support";
+       note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
        next;
    }
 
index db429474a2524546fb4ef762617066f2d33cb1c8..2c5042eb417a8f6b24fe682adb45550637bce8c8 100755 (executable)
@@ -152,6 +152,7 @@ do
    # as field names, which is unfortunate but we won't change it now.
    test "$f" = src/bin/pg_dump/compress_gzip.h && continue
    test "$f" = src/bin/pg_dump/compress_io.h && continue
+   test "$f" = src/bin/pg_dump/compress_lz4.h && continue
    test "$f" = src/bin/pg_dump/compress_none.h && continue
    test "$f" = src/bin/pg_dump/parallel.h && continue
    test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
index d4bb7442bec61218183a6c8bb32953b3405c1012..86a9303bf56bc7c0a49c3735202acb65206df0c0 100644 (file)
@@ -1387,11 +1387,13 @@ LWLock
 LWLockHandle
 LWLockMode
 LWLockPadded
+LZ4CompressorState
 LZ4F_compressionContext_t
 LZ4F_decompressOptions_t
 LZ4F_decompressionContext_t
 LZ4F_errorCode_t
 LZ4F_preferences_t
+LZ4File
 LabelProvider
 LagTracker
 LargeObjectDesc