Prepare pg_dump internals for additional compression methods
authorTomas Vondra <tomas.vondra@postgresql.org>
Thu, 23 Feb 2023 14:38:14 +0000 (15:38 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Thu, 23 Feb 2023 14:38:40 +0000 (15:38 +0100)
Commit bf9aa490db introduced a compression API in compress_io.{c,h} to
make reuse easier, and allow adding more compression algorithms.
However, pg_backup_archiver.c was not switched to this API and continued
to call the compression directly.

This commit teaches pg_backup_archiver.c about the compression API, so
that it can benefit from bf9aa490db (simpler code, easier addition of
new compression methods).

Author: Georgios Kokolatos
Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Tomas Vondra
Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com

src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_io.h
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h

index 7a2c80bbc4c120ca5a57dd4bd74b039a6e501066..5ac21f091f0a2374d7a79e73fc3dbb6c29551ace 100644 (file)
 #include "compress_io.h"
 #include "pg_backup_utils.h"
 
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+/*----------------------
+ * Generic functions
+ *----------------------
+ */
+
+/*
+ * Checks whether a compression algorithm is supported.
+ *
+ * On success returns NULL, otherwise returns a malloc'ed string which can be
+ * used by the caller in an error message.
+ */
+char *
+supports_compression(const pg_compress_specification compression_spec)
+{
+       const pg_compress_algorithm     algorithm = compression_spec.algorithm;
+       bool                                            supported = false;
+
+       if (algorithm == PG_COMPRESSION_NONE)
+               supported = true;
+#ifdef HAVE_LIBZ
+       if (algorithm == PG_COMPRESSION_GZIP)
+               supported = true;
+#endif
+
+       if (!supported)
+               return psprintf("this build does not support compression with %s",
+                                               get_compress_algorithm_name(algorithm));
+
+       return NULL;
+}
+
 /*----------------------
  * Compressor API
  *----------------------
@@ -490,16 +525,19 @@ cfopen_write(const char *path, const char *mode,
 }
 
 /*
- * Opens file 'path' in 'mode'. If compression is GZIP, the file
- * is opened with libz gzopen(), otherwise with plain fopen().
+ * This is the workhorse for cfopen() or cfdopen(). It opens file 'path' or
+ * associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'. The
+ * descriptor is not dup'ed and it is the caller's responsibility to do so.
+ * The caller must verify that the 'compress_algorithm' is supported by the
+ * current build.
  *
  * On failure, return NULL with an error code in errno.
  */
-cfp *
-cfopen(const char *path, const char *mode,
-          const pg_compress_specification compression_spec)
+static cfp *
+cfopen_internal(const char *path, int fd, const char *mode,
+                               pg_compress_specification compression_spec)
 {
-       cfp                *fp = pg_malloc(sizeof(cfp));
+       cfp                *fp = pg_malloc0(sizeof(cfp));
 
        if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
        {
@@ -511,15 +549,20 @@ cfopen(const char *path, const char *mode,
 
                        snprintf(mode_compression, sizeof(mode_compression), "%s%d",
                                         mode, compression_spec.level);
-                       fp->compressedfp = gzopen(path, mode_compression);
+                       if (fd >= 0)
+                               fp->compressedfp = gzdopen(fd, mode_compression);
+                       else
+                               fp->compressedfp = gzopen(path, mode_compression);
                }
                else
                {
                        /* don't specify a level, just use the zlib default */
-                       fp->compressedfp = gzopen(path, mode);
+                       if (fd >= 0)
+                               fp->compressedfp = gzdopen(fd, mode);
+                       else
+                               fp->compressedfp = gzopen(path, mode);
                }
 
-               fp->uncompressedfp = NULL;
                if (fp->compressedfp == NULL)
                {
                        free_keep_errno(fp);
@@ -531,10 +574,11 @@ cfopen(const char *path, const char *mode,
        }
        else
        {
-#ifdef HAVE_LIBZ
-               fp->compressedfp = NULL;
-#endif
-               fp->uncompressedfp = fopen(path, mode);
+               if (fd >= 0)
+                       fp->uncompressedfp = fdopen(fd, mode);
+               else
+                       fp->uncompressedfp = fopen(path, mode);
+
                if (fp->uncompressedfp == NULL)
                {
                        free_keep_errno(fp);
@@ -545,6 +589,33 @@ cfopen(const char *path, const char *mode,
        return fp;
 }
 
+/*
+ * Opens file 'path' in 'mode' and compression as defined in
+ * compression_spec. The caller must verify that the compression
+ * is supported by the current build.
+ *
+ * On failure, return NULL with an error code in errno.
+ */
+cfp *
+cfopen(const char *path, const char *mode,
+          const pg_compress_specification compression_spec)
+{
+       return cfopen_internal(path, -1, mode, compression_spec);
+}
+
+/*
+ * Associates a stream 'fd', if 'fd' is a valid descriptor, in 'mode'
+ * and compression as defined in compression_spec. The caller must
+ * verify that the compression is supported by the current build.
+ *
+ * On failure, return NULL with an error code in errno.
+ */
+cfp *
+cfdopen(int fd, const char *mode,
+               const pg_compress_specification compression_spec)
+{
+       return cfopen_internal(NULL, fd, mode, compression_spec);
+}
 
 int
 cfread(void *ptr, int size, cfp *fp)
index a429dc4789d884545736f36362bc08ce7f33dfee..8beb1058ec22f8a5d7cf0f081cb08500728b4087 100644 (file)
@@ -21,6 +21,8 @@
 #define ZLIB_OUT_SIZE  4096
 #define ZLIB_IN_SIZE   4096
 
+extern char *supports_compression(const pg_compress_specification compression_spec);
+
 /* Prototype for callback function to WriteDataToArchive() */
 typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
 
@@ -54,6 +56,8 @@ typedef struct cfp cfp;
 
 extern cfp *cfopen(const char *path, const char *mode,
                                   const pg_compress_specification compression_spec);
+extern cfp *cfdopen(int fd, const char *mode,
+                                       const pg_compress_specification compression_spec);
 extern cfp *cfopen_read(const char *path, const char *mode);
 extern cfp *cfopen_write(const char *path, const char *mode,
                                                 const pg_compress_specification compression_spec);
index 269bfce019b5b9354c0a21b3a3412dffe7297eab..1bf75178fd8925a56fbb712c874ed89bb0974526 100644 (file)
@@ -31,6 +31,7 @@
 #endif
 
 #include "common/string.h"
+#include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
 #include "lib/stringinfo.h"
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/* state needed to save/restore an archive's output target */
-typedef struct _outputContext
-{
-       void       *OF;
-       int                     gzOut;
-} OutputContext;
-
 /*
  * State for tracking TocEntrys that are ready to process during a parallel
  * restore.  (This used to be a list, and we still call it that, though now
@@ -101,8 +95,8 @@ static void dump_lo_buf(ArchiveHandle *AH);
 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
 static void SetOutput(ArchiveHandle *AH, const char *filename,
                                          const pg_compress_specification compression_spec);
-static OutputContext SaveOutput(ArchiveHandle *AH);
-static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
+static cfp *SaveOutput(ArchiveHandle *AH);
+static void RestoreOutput(ArchiveHandle *AH, cfp *savedOutput);
 
 static int     restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
 static void restore_toc_entries_prefork(ArchiveHandle *AH,
@@ -277,11 +271,8 @@ CloseArchive(Archive *AHX)
        AH->ClosePtr(AH);
 
        /* Close the output */
-       errno = 0;                                      /* in case gzclose() doesn't set it */
-       if (AH->gzOut)
-               res = GZCLOSE(AH->OF);
-       else if (AH->OF != stdout)
-               res = fclose(AH->OF);
+       errno = 0;
+       res = cfclose(AH->OF);
 
        if (res != 0)
                pg_fatal("could not close output file: %m");
@@ -363,7 +354,7 @@ RestoreArchive(Archive *AHX)
        RestoreOptions *ropt = AH->public.ropt;
        bool            parallel_mode;
        TocEntry   *te;
-       OutputContext sav;
+       cfp                *sav;
 
        AH->stage = STAGE_INITIALIZING;
 
@@ -391,17 +382,21 @@ RestoreArchive(Archive *AHX)
        /*
         * Make sure we won't need (de)compression we haven't got
         */
-#ifndef HAVE_LIBZ
-       if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP &&
-               AH->PrintTocDataPtr != NULL)
+       if (AH->PrintTocDataPtr != NULL)
        {
                for (te = AH->toc->next; te != AH->toc; te = te->next)
                {
                        if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
-                               pg_fatal("cannot restore from compressed archive (compression not supported in this installation)");
+                       {
+                               char *errmsg = supports_compression(AH->compression_spec);
+                               if (errmsg)
+                                       pg_fatal("cannot restore from compressed archive (%s)",
+                                                         errmsg);
+                               else
+                                       break;
+                       }
                }
        }
-#endif
 
        /*
         * Prepare index arrays, so we can assume we have them throughout restore.
@@ -1133,7 +1128,7 @@ PrintTOCSummary(Archive *AHX)
        TocEntry   *te;
        pg_compress_specification out_compression_spec = {0};
        teSection       curSection;
-       OutputContext sav;
+       cfp                *sav;
        const char *fmtName;
        char            stamp_str[64];
 
@@ -1508,58 +1503,32 @@ static void
 SetOutput(ArchiveHandle *AH, const char *filename,
                  const pg_compress_specification compression_spec)
 {
-       int                     fn;
+       const char *mode;
+       int                     fn = -1;
 
        if (filename)
        {
                if (strcmp(filename, "-") == 0)
                        fn = fileno(stdout);
-               else
-                       fn = -1;
        }
        else if (AH->FH)
                fn = fileno(AH->FH);
        else if (AH->fSpec)
        {
-               fn = -1;
                filename = AH->fSpec;
        }
        else
                fn = fileno(stdout);
 
-       /* If compression explicitly requested, use gzopen */
-#ifdef HAVE_LIBZ
-       if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
-       {
-               char            fmode[14];
+       if (AH->mode == archModeAppend)
+               mode = PG_BINARY_A;
+       else
+               mode = PG_BINARY_W;
 
-               /* Don't use PG_BINARY_x since this is zlib */
-               sprintf(fmode, "wb%d", compression_spec.level);
-               if (fn >= 0)
-                       AH->OF = gzdopen(dup(fn), fmode);
-               else
-                       AH->OF = gzopen(filename, fmode);
-               AH->gzOut = 1;
-       }
+       if (fn >= 0)
+               AH->OF = cfdopen(dup(fn), mode, compression_spec);
        else
-#endif
-       {                                                       /* Use fopen */
-               if (AH->mode == archModeAppend)
-               {
-                       if (fn >= 0)
-                               AH->OF = fdopen(dup(fn), PG_BINARY_A);
-                       else
-                               AH->OF = fopen(filename, PG_BINARY_A);
-               }
-               else
-               {
-                       if (fn >= 0)
-                               AH->OF = fdopen(dup(fn), PG_BINARY_W);
-                       else
-                               AH->OF = fopen(filename, PG_BINARY_W);
-               }
-               AH->gzOut = 0;
-       }
+               AH->OF = cfopen(filename, mode, compression_spec);
 
        if (!AH->OF)
        {
@@ -1570,33 +1539,24 @@ SetOutput(ArchiveHandle *AH, const char *filename,
        }
 }
 
-static OutputContext
+static cfp *
 SaveOutput(ArchiveHandle *AH)
 {
-       OutputContext sav;
-
-       sav.OF = AH->OF;
-       sav.gzOut = AH->gzOut;
-
-       return sav;
+       return (cfp *) AH->OF;
 }
 
 static void
-RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
+RestoreOutput(ArchiveHandle *AH, cfp *savedOutput)
 {
        int                     res;
 
-       errno = 0;                                      /* in case gzclose() doesn't set it */
-       if (AH->gzOut)
-               res = GZCLOSE(AH->OF);
-       else
-               res = fclose(AH->OF);
+       errno = 0;
+       res = cfclose(AH->OF);
 
        if (res != 0)
                pg_fatal("could not close output file: %m");
 
-       AH->gzOut = savedContext.gzOut;
-       AH->OF = savedContext.OF;
+       AH->OF = savedOutput;
 }
 
 
@@ -1720,22 +1680,17 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
 
                bytes_written = size * nmemb;
        }
-       else if (AH->gzOut)
-               bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
        else if (AH->CustomOutPtr)
                bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
 
+       /*
+        * If we're doing a restore, and it's direct to DB, and we're connected
+        * then send it to the DB.
+        */
+       else if (RestoringToDB(AH))
+               bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
        else
-       {
-               /*
-                * If we're doing a restore, and it's direct to DB, and we're
-                * connected then send it to the DB.
-                */
-               if (RestoringToDB(AH))
-                       bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
-               else
-                       bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
-       }
+               bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
 
        if (bytes_written != size * nmemb)
                WRITE_ERROR_EXIT;
@@ -2224,6 +2179,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
                 SetupWorkerPtrType setupWorkerPtr)
 {
        ArchiveHandle *AH;
+       pg_compress_specification out_compress_spec = {0};
 
        pg_log_debug("allocating AH for %s, format %d",
                                 FileSpec ? FileSpec : "(stdio)", fmt);
@@ -2277,8 +2233,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
        memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
 
        /* Open stdout with no compression for AH output handle */
-       AH->gzOut = 0;
-       AH->OF = stdout;
+       out_compress_spec.algorithm = PG_COMPRESSION_NONE;
+       AH->OF = cfdopen(dup(fileno(stdout)), PG_BINARY_A, out_compress_spec);
 
        /*
         * On Windows, we need to use binary mode to read/write non-text files,
@@ -3712,6 +3668,7 @@ WriteHead(ArchiveHandle *AH)
 void
 ReadHead(ArchiveHandle *AH)
 {
+       char       *errmsg;
        char            vmaj,
                                vmin,
                                vrev;
@@ -3781,10 +3738,13 @@ ReadHead(ArchiveHandle *AH)
        else
                AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
 
-#ifndef HAVE_LIBZ
-       if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
-               pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
-#endif
+       errmsg = supports_compression(AH->compression_spec);
+       if (errmsg)
+       {
+               pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
+                                               errmsg);
+               pg_free(errmsg);
+       }
 
        if (AH->version >= K_VERS_1_4)
        {
index f72446ed5b42911279b20ad40ba79243b4f9a7c2..4725e49747b66202d845712b2d7e526409540eed 100644 (file)
 
 #define LOBBUFSIZE 16384
 
-#ifdef HAVE_LIBZ
-#include <zlib.h>
-#define GZCLOSE(fh) gzclose(fh)
-#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s))
-#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s))
-#define GZEOF(fh)      gzeof(fh)
-#else
-#define GZCLOSE(fh) fclose(fh)
-#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
-#define GZREAD(p, s, n, fh) fread(p, s, n, fh)
-#define GZEOF(fh)      feof(fh)
-/* this is just the redefinition of a libz constant */
-#define Z_DEFAULT_COMPRESSION (-1)
-
-typedef struct _z_stream
-{
-       void       *next_in;
-       void       *next_out;
-       size_t          avail_in;
-       size_t          avail_out;
-} z_stream;
-typedef z_stream *z_streamp;
-#endif
-
 /* Data block types */
 #define BLK_DATA 1
 #define BLK_BLOBS 3
@@ -319,8 +295,7 @@ struct _archiveHandle
 
        char       *fSpec;                      /* Archive File Spec */
        FILE       *FH;                         /* General purpose file handle */
-       void       *OF;
-       int                     gzOut;                  /* Output file */
+       void       *OF;                         /* Output file */
 
        struct _tocEntry *toc;          /* Header of circular list of TOC entries */
        int                     tocCount;               /* Number of TOC entries */