walmethods.c/h: Make WalWriteMethod more object-oriented.
authorRobert Haas <rhaas@postgresql.org>
Mon, 19 Sep 2022 16:53:46 +0000 (12:53 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 19 Sep 2022 16:53:46 +0000 (12:53 -0400)
Normally when we use object-oriented programming techniques, we
provide a pointer to an object and then some way of looking up the
associated table of callbacks, but walmethods.c/h took the alternative
approach of providing only a pointer to the table of callbacks and
thus imposed the artificial restriction that there could only ever be
one object of each type, so that the callbacks could find it via a
global variable. That doesn't seem like the right idea, so revise the
approach.

Each callback which does not already have an argument of type
Walfile * now takes a pointer to the relevant WalWriteMethod *
so that these callbacks need not rely on there being only one
object of each type.

Freeing a WalWriteMethod is now performed via a callback provided
for that purpose rather than requiring the caller to know which
WAL method they want to free.

Discussion: http://postgr.es/m/CA+TgmoZS0Kw98fOoAcGz8B9iDhdqB4Be4e=vDZaJZ5A-xMYBqA@mail.gmail.com

src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivewal.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/walmethods.c
src/bin/pg_basebackup/walmethods.h

index a830b199f54d271da2902a9de702ec6dfca08ac3..876d20611b6f9c4b2cdb331f65e6a94c058f4335 100644 (file)
@@ -570,7 +570,7 @@ LogStreamerMain(logstreamer_param *param)
        return 1;
    }
 
-   if (!stream.walmethod->finish())
+   if (!stream.walmethod->ops->finish(stream.walmethod))
    {
        pg_log_error("could not finish writing WAL files: %m");
 #ifdef WIN32
@@ -581,11 +581,7 @@ LogStreamerMain(logstreamer_param *param)
 
    PQfinish(param->bgconn);
 
-   if (format == 'p')
-       FreeWalDirectoryMethod();
-   else
-       FreeWalTarMethod();
-   pg_free(stream.walmethod);
+   stream.walmethod->ops->free(stream.walmethod);
 
    return 0;
 }
index 5c22c914bc7fae4fcac4bf784974ece5f190a19b..a7180e2955bdd78868d4a29bcd761e5e09c0d291 100644 (file)
@@ -658,7 +658,7 @@ StreamLog(void)
 
    ReceiveXlogStream(conn, &stream);
 
-   if (!stream.walmethod->finish())
+   if (!stream.walmethod->ops->finish(stream.walmethod))
    {
        pg_log_info("could not finish writing WAL files: %m");
        return;
@@ -667,9 +667,7 @@ StreamLog(void)
    PQfinish(conn);
    conn = NULL;
 
-   FreeWalDirectoryMethod();
-   pg_free(stream.walmethod);
-   pg_free(stream.sysidentifier);
+   stream.walmethod->ops->free(stream.walmethod);
 }
 
 /*
index a619176511fb77b8c120138da5fdcb5aba5f4d2b..9c71323d7088d609d3cb4dae8124b65df3003536 100644 (file)
@@ -59,18 +59,19 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
    snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
             fname);
 
-   f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+   f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
+                                              NULL, 0);
    if (f == NULL)
    {
        pg_log_error("could not create archive status file \"%s\": %s",
-                    tmppath, stream->walmethod->getlasterror());
+                    tmppath, GetLastWalMethodError(stream->walmethod));
        return false;
    }
 
-   if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+   if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
    {
        pg_log_error("could not close archive status file \"%s\": %s",
-                    tmppath, stream->walmethod->getlasterror());
+                    tmppath, GetLastWalMethodError(stream->walmethod));
        return false;
    }
 
@@ -98,8 +99,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
    XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
 
    /* Note that this considers the compression used if necessary */
-   fn = stream->walmethod->get_file_name(walfile_name,
-                                         stream->partial_suffix);
+   fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+                                              walfile_name,
+                                              stream->partial_suffix);
 
    /*
     * When streaming to files, if an existing file exists we verify that it's
@@ -111,35 +113,35 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
     * When streaming to tar, no file with this name will exist before, so we
     * never have to verify a size.
     */
-   if (stream->walmethod->compression_algorithm() == PG_COMPRESSION_NONE &&
-       stream->walmethod->existsfile(fn))
+   if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
+       stream->walmethod->ops->existsfile(stream->walmethod, fn))
    {
-       size = stream->walmethod->get_file_size(fn);
+       size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
        if (size < 0)
        {
            pg_log_error("could not get size of write-ahead log file \"%s\": %s",
-                        fn, stream->walmethod->getlasterror());
+                        fn, GetLastWalMethodError(stream->walmethod));
            pg_free(fn);
            return false;
        }
        if (size == WalSegSz)
        {
            /* Already padded file. Open it for use */
-           f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0);
+           f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
            if (f == NULL)
            {
                pg_log_error("could not open existing write-ahead log file \"%s\": %s",
-                            fn, stream->walmethod->getlasterror());
+                            fn, GetLastWalMethodError(stream->walmethod));
                pg_free(fn);
                return false;
            }
 
            /* fsync file in case of a previous crash */
-           if (stream->walmethod->sync(f) != 0)
+           if (stream->walmethod->ops->sync(f) != 0)
            {
                pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
-                            fn, stream->walmethod->getlasterror());
-               stream->walmethod->close(f, CLOSE_UNLINK);
+                            fn, GetLastWalMethodError(stream->walmethod));
+               stream->walmethod->ops->close(f, CLOSE_UNLINK);
                exit(1);
            }
 
@@ -164,12 +166,14 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 
    /* No file existed, so create one */
 
-   f = stream->walmethod->open_for_write(walfile_name,
-                                         stream->partial_suffix, WalSegSz);
+   f = stream->walmethod->ops->open_for_write(stream->walmethod,
+                                              walfile_name,
+                                              stream->partial_suffix,
+                                              WalSegSz);
    if (f == NULL)
    {
        pg_log_error("could not open write-ahead log file \"%s\": %s",
-                    fn, stream->walmethod->getlasterror());
+                    fn, GetLastWalMethodError(stream->walmethod));
        pg_free(fn);
        return false;
    }
@@ -199,28 +203,29 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
    currpos = walfile->currpos;
 
    /* Note that this considers the compression used if necessary */
-   fn = stream->walmethod->get_file_name(walfile_name,
-                                         stream->partial_suffix);
+   fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+                                              walfile_name,
+                                              stream->partial_suffix);
 
    if (stream->partial_suffix)
    {
        if (currpos == WalSegSz)
-           r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+           r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
        else
        {
            pg_log_info("not renaming \"%s\", segment is not complete", fn);
-           r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+           r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
        }
    }
    else
-       r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+       r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
 
    walfile = NULL;
 
    if (r != 0)
    {
        pg_log_error("could not close file \"%s\": %s",
-                    fn, stream->walmethod->getlasterror());
+                    fn, GetLastWalMethodError(stream->walmethod));
 
        pg_free(fn);
        return false;
@@ -263,7 +268,7 @@ existsTimeLineHistoryFile(StreamCtl *stream)
 
    TLHistoryFileName(histfname, stream->timeline);
 
-   return stream->walmethod->existsfile(histfname);
+   return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
 }
 
 static bool
@@ -285,31 +290,32 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
        return false;
    }
 
-   f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+   f = stream->walmethod->ops->open_for_write(stream->walmethod,
+                                              histfname, ".tmp", 0);
    if (f == NULL)
    {
        pg_log_error("could not create timeline history file \"%s\": %s",
-                    histfname, stream->walmethod->getlasterror());
+                    histfname, GetLastWalMethodError(stream->walmethod));
        return false;
    }
 
-   if ((int) stream->walmethod->write(f, content, size) != size)
+   if ((int) stream->walmethod->ops->write(f, content, size) != size)
    {
        pg_log_error("could not write timeline history file \"%s\": %s",
-                    histfname, stream->walmethod->getlasterror());
+                    histfname, GetLastWalMethodError(stream->walmethod));
 
        /*
         * If we fail to make the file, delete it to release disk space
         */
-       stream->walmethod->close(f, CLOSE_UNLINK);
+       stream->walmethod->ops->close(f, CLOSE_UNLINK);
 
        return false;
    }
 
-   if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+   if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
    {
        pg_log_error("could not close file \"%s\": %s",
-                    histfname, stream->walmethod->getlasterror());
+                    histfname, GetLastWalMethodError(stream->walmethod));
        return false;
    }
 
@@ -678,9 +684,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
    }
 
 error:
-   if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
+   if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
        pg_log_error("could not close file \"%s\": %s",
-                    walfile->pathname, stream->walmethod->getlasterror());
+                    walfile->pathname, GetLastWalMethodError(stream->walmethod));
    walfile = NULL;
    return false;
 }
@@ -765,9 +771,9 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
         */
        if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
        {
-           if (stream->walmethod->sync(walfile) != 0)
+           if (stream->walmethod->ops->sync(walfile) != 0)
                pg_fatal("could not fsync file \"%s\": %s",
-                        walfile->pathname, stream->walmethod->getlasterror());
+                        walfile->pathname, GetLastWalMethodError(stream->walmethod));
            lastFlushPosition = blockpos;
 
            /*
@@ -1012,9 +1018,9 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
             * data has been successfully replicated or not, at the normal
             * shutdown of the server.
             */
-           if (stream->walmethod->sync(walfile) != 0)
+           if (stream->walmethod->ops->sync(walfile) != 0)
                pg_fatal("could not fsync file \"%s\": %s",
-                        walfile->pathname, stream->walmethod->getlasterror());
+                        walfile->pathname, GetLastWalMethodError(stream->walmethod));
            lastFlushPosition = blockpos;
        }
 
@@ -1115,12 +1121,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
            }
        }
 
-       if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
-                                    bytes_to_write) != bytes_to_write)
+       if (stream->walmethod->ops->write(walfile,
+                                         copybuf + hdr_len + bytes_written,
+                                         bytes_to_write) != bytes_to_write)
        {
            pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
                         bytes_to_write, walfile->pathname,
-                        stream->walmethod->getlasterror());
+                        GetLastWalMethodError(stream->walmethod));
            return false;
        }
 
index d98a2681b9012b4860d3fd69fe0c7e93d23972bb..bc2e83d02befa6d8c689fab27b30fee638a6e44c 100644 (file)
@@ -2,9 +2,6 @@
  *
  * walmethods.c - implementations of different ways to write received wal
  *
- * NOTE! The caller must ensure that only one method is instantiated in
- *      any given program, and that it's only instantiated once!
- *
  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
  *-------------------------------------------------------------------------
  */
 
+static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
+                                  const char *pathname,
+                                  const char *temp_suffix,
+                                  size_t pad_to_size);
+static int dir_close(Walfile *f, WalCloseMethod method);
+static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
+                                const char *pathname);
+static char *dir_get_file_name(WalWriteMethod *wwmethod,
+                              const char *pathname, const char *temp_suffix);
+static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
+static int dir_sync(Walfile *f);
+static bool dir_finish(WalWriteMethod *wwmethod);
+static void dir_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalDirectoryMethodOps = {
+   .open_for_write = dir_open_for_write,
+   .close = dir_close,
+   .existsfile = dir_existsfile,
+   .get_file_size = dir_get_file_size,
+   .get_file_name = dir_get_file_name,
+   .write = dir_write,
+   .sync = dir_sync,
+   .finish = dir_finish,
+   .free = dir_free
+};
+
 /*
  * Global static data for this method
  */
 typedef struct DirectoryMethodData
 {
+   WalWriteMethod  base;
    char       *basedir;
-   pg_compress_algorithm compression_algorithm;
-   int         compression_level;
-   bool        sync;
-   const char *lasterrstring;  /* if set, takes precedence over lasterrno */
-   int         lasterrno;
 } DirectoryMethodData;
-static DirectoryMethodData *dir_data = NULL;
 
 /*
  * Local file handle
@@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile
 #endif
 } DirectoryMethodFile;
 
-#define dir_clear_error() \
-   (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
-#define dir_set_error(msg) \
-   (dir_data->lasterrstring = _(msg))
-
-static const char *
-dir_getlasterror(void)
-{
-   if (dir_data->lasterrstring)
-       return dir_data->lasterrstring;
-   return strerror(dir_data->lasterrno);
-}
+#define clear_error(wwmethod) \
+   ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
 
 static char *
-dir_get_file_name(const char *pathname, const char *temp_suffix)
+dir_get_file_name(WalWriteMethod *wwmethod,
+                 const char *pathname, const char *temp_suffix)
 {
    char       *filename = pg_malloc0(MAXPGPATH * sizeof(char));
 
    snprintf(filename, MAXPGPATH, "%s%s%s",
             pathname,
-            dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
-            dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+            wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+            wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
             temp_suffix ? temp_suffix : "");
 
    return filename;
 }
 
 static Walfile *
-dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+                  const char *temp_suffix, size_t pad_to_size)
 {
+   DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
    char        tmppath[MAXPGPATH];
    char       *filename;
    int         fd;
@@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
    void       *lz4buf = NULL;
 #endif
 
-   dir_clear_error();
+   clear_error(wwmethod);
 
-   filename = dir_get_file_name(pathname, temp_suffix);
+   filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
    snprintf(tmppath, sizeof(tmppath), "%s/%s",
             dir_data->basedir, filename);
    pg_free(filename);
@@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
    fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
    if (fd < 0)
    {
-       dir_data->lasterrno = errno;
+       wwmethod->lasterrno = errno;
        return NULL;
    }
 
 #ifdef HAVE_LIBZ
-   if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        gzfp = gzdopen(fd, "wb");
        if (gzfp == NULL)
        {
-           dir_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            close(fd);
            return NULL;
        }
 
-       if (gzsetparams(gzfp, dir_data->compression_level,
+       if (gzsetparams(gzfp, wwmethod->compression_level,
                        Z_DEFAULT_STRATEGY) != Z_OK)
        {
-           dir_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            gzclose(gzfp);
            return NULL;
        }
    }
 #endif
 #ifdef USE_LZ4
-   if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
    {
        size_t      ctx_out;
        size_t      header_size;
@@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
        ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
        if (LZ4F_isError(ctx_out))
        {
-           dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+           wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
            close(fd);
            return NULL;
        }
@@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
        /* assign the compression level, default is 0 */
        memset(&prefs, 0, sizeof(prefs));
-       prefs.compressionLevel = dir_data->compression_level;
+       prefs.compressionLevel = wwmethod->compression_level;
 
        /* add the header */
        header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
        if (LZ4F_isError(header_size))
        {
-           dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+           wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
            (void) LZ4F_freeCompressionContext(ctx);
            pg_free(lz4buf);
            close(fd);
@@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
        if (write(fd, lz4buf, header_size) != header_size)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           dir_data->lasterrno = errno ? errno : ENOSPC;
+           wwmethod->lasterrno = errno ? errno : ENOSPC;
            (void) LZ4F_freeCompressionContext(ctx);
            pg_free(lz4buf);
            close(fd);
@@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 #endif
 
    /* Do pre-padding on non-compressed files */
-   if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+   if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
    {
        PGAlignedXLogBlock zerobuf;
        int         bytes;
@@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
            if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
            {
                /* If write didn't set errno, assume problem is no disk space */
-               dir_data->lasterrno = errno ? errno : ENOSPC;
+               wwmethod->lasterrno = errno ? errno : ENOSPC;
                close(fd);
                return NULL;
            }
@@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
        if (lseek(fd, 0, SEEK_SET) != 0)
        {
-           dir_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            close(fd);
            return NULL;
        }
@@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
     * important when using synchronous mode, where the file is modified and
     * fsynced in-place, without a directory fsync.
     */
-   if (dir_data->sync)
+   if (wwmethod->sync)
    {
        if (fsync_fname(tmppath, false) != 0 ||
            fsync_parent_path(tmppath) != 0)
        {
-           dir_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
 #ifdef HAVE_LIBZ
-           if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+           if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
                gzclose(gzfp);
            else
 #endif
 #ifdef USE_LZ4
-           if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+           if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
            {
                (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
                (void) LZ4F_freeCompressionContext(ctx);
@@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
    f = pg_malloc0(sizeof(DirectoryMethodFile));
 #ifdef HAVE_LIBZ
-   if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
        f->gzfp = gzfp;
 #endif
 #ifdef USE_LZ4
-   if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
    {
        f->ctx = ctx;
        f->lz4buf = lz4buf;
@@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
    }
 #endif
 
+   f->base.wwmethod = wwmethod;
    f->base.currpos = 0;
    f->base.pathname = pg_strdup(pathname);
    f->fd = fd;
@@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count)
    DirectoryMethodFile *df = (DirectoryMethodFile *) f;
 
    Assert(f != NULL);
-   dir_clear_error();
+   clear_error(f->wwmethod);
 
 #ifdef HAVE_LIBZ
-   if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        errno = 0;
        r = (ssize_t) gzwrite(df->gzfp, buf, count);
        if (r != count)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           dir_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
        }
    }
    else
 #endif
 #ifdef USE_LZ4
-   if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
    {
        size_t      chunk;
        size_t      remaining;
@@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
 
            if (LZ4F_isError(compressed))
            {
-               dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+               f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
                return -1;
            }
 
@@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
            if (write(df->fd, df->lz4buf, compressed) != compressed)
            {
                /* If write didn't set errno, assume problem is no disk space */
-               dir_data->lasterrno = errno ? errno : ENOSPC;
+               f->wwmethod->lasterrno = errno ? errno : ENOSPC;
                return -1;
            }
 
@@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
        if (r != count)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           dir_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
        }
    }
    if (r > 0)
@@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method)
 {
    int         r;
    DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+   DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
    char        tmppath[MAXPGPATH];
    char        tmppath2[MAXPGPATH];
 
    Assert(f != NULL);
-   dir_clear_error();
+   clear_error(f->wwmethod);
 
 #ifdef HAVE_LIBZ
-   if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        errno = 0;              /* in case gzclose() doesn't set it */
        r = gzclose(df->gzfp);
@@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method)
    else
 #endif
 #ifdef USE_LZ4
-   if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
    {
        size_t      compressed;
 
@@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method)
 
        if (LZ4F_isError(compressed))
        {
-           dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+           f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
            return -1;
        }
 
@@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method)
        if (write(df->fd, df->lz4buf, compressed) != compressed)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           dir_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
            return -1;
        }
 
@@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method)
             * If we have a temp prefix, normal operation is to rename the
             * file.
             */
-           filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+           filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+                                        df->temp_suffix);
            snprintf(tmppath, sizeof(tmppath), "%s/%s",
                     dir_data->basedir, filename);
            pg_free(filename);
 
            /* permanent name, so no need for the prefix */
-           filename2 = dir_get_file_name(df->base.pathname, NULL);
+           filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
            snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
                     dir_data->basedir, filename2);
            pg_free(filename2);
-           if (dir_data->sync)
+           if (f->wwmethod->sync)
                r = durable_rename(tmppath, tmppath2);
            else
            {
@@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method)
            char       *filename;
 
            /* Unlink the file once it's closed */
-           filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+           filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+                                        df->temp_suffix);
            snprintf(tmppath, sizeof(tmppath), "%s/%s",
                     dir_data->basedir, filename);
            pg_free(filename);
@@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method)
             * CLOSE_NO_RENAME. In this case, fsync the file and containing
             * directory if sync mode is requested.
             */
-           if (dir_data->sync)
+           if (f->wwmethod->sync)
            {
                r = fsync_fname(df->fullpath, false);
                if (r == 0)
@@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method)
    }
 
    if (r != 0)
-       dir_data->lasterrno = errno;
+       f->wwmethod->lasterrno = errno;
 
 #ifdef USE_LZ4
    pg_free(df->lz4buf);
@@ -501,23 +517,23 @@ dir_sync(Walfile *f)
    int         r;
 
    Assert(f != NULL);
-   dir_clear_error();
+   clear_error(f->wwmethod);
 
-   if (!dir_data->sync)
+   if (!f->wwmethod->sync)
        return 0;
 
 #ifdef HAVE_LIBZ
-   if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
        {
-           dir_data->lasterrno = errno;
+           f->wwmethod->lasterrno = errno;
            return -1;
        }
    }
 #endif
 #ifdef USE_LZ4
-   if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
    {
        DirectoryMethodFile *df = (DirectoryMethodFile *) f;
        size_t      compressed;
@@ -526,7 +542,7 @@ dir_sync(Walfile *f)
        compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
        if (LZ4F_isError(compressed))
        {
-           dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+           f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
            return -1;
        }
 
@@ -534,7 +550,7 @@ dir_sync(Walfile *f)
        if (write(df->fd, df->lz4buf, compressed) != compressed)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           dir_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
            return -1;
        }
    }
@@ -542,13 +558,14 @@ dir_sync(Walfile *f)
 
    r = fsync(((DirectoryMethodFile *) f)->fd);
    if (r < 0)
-       dir_data->lasterrno = errno;
+       f->wwmethod->lasterrno = errno;
    return r;
 }
 
 static ssize_t
-dir_get_file_size(const char *pathname)
+dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
 {
+   DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
    struct stat statbuf;
    char        tmppath[MAXPGPATH];
 
@@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname)
 
    if (stat(tmppath, &statbuf) != 0)
    {
-       dir_data->lasterrno = errno;
+       wwmethod->lasterrno = errno;
        return -1;
    }
 
    return statbuf.st_size;
 }
 
-static pg_compress_algorithm
-dir_compression_algorithm(void)
-{
-   return dir_data->compression_algorithm;
-}
-
 static bool
-dir_existsfile(const char *pathname)
+dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
 {
+   DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
    char        tmppath[MAXPGPATH];
    int         fd;
 
-   dir_clear_error();
+   clear_error(wwmethod);
 
    snprintf(tmppath, sizeof(tmppath), "%s/%s",
             dir_data->basedir, pathname);
@@ -589,60 +601,54 @@ dir_existsfile(const char *pathname)
 }
 
 static bool
-dir_finish(void)
+dir_finish(WalWriteMethod *wwmethod)
 {
-   dir_clear_error();
+   clear_error(wwmethod);
 
-   if (dir_data->sync)
+   if (wwmethod->sync)
    {
+       DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
        /*
         * Files are fsynced when they are closed, but we need to fsync the
         * directory entry here as well.
         */
        if (fsync_fname(dir_data->basedir, true) != 0)
        {
-           dir_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            return false;
        }
    }
    return true;
 }
 
+static void
+dir_free(WalWriteMethod *wwmethod)
+{
+   DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
+   pg_free(dir_data->basedir);
+   pg_free(wwmethod);
+}
+
 
 WalWriteMethod *
 CreateWalDirectoryMethod(const char *basedir,
                         pg_compress_algorithm compression_algorithm,
                         int compression_level, bool sync)
 {
-   WalWriteMethod *method;
-
-   method = pg_malloc0(sizeof(WalWriteMethod));
-   method->open_for_write = dir_open_for_write;
-   method->write = dir_write;
-   method->get_file_size = dir_get_file_size;
-   method->get_file_name = dir_get_file_name;
-   method->compression_algorithm = dir_compression_algorithm;
-   method->close = dir_close;
-   method->sync = dir_sync;
-   method->existsfile = dir_existsfile;
-   method->finish = dir_finish;
-   method->getlasterror = dir_getlasterror;
-
-   dir_data = pg_malloc0(sizeof(DirectoryMethodData));
-   dir_data->compression_algorithm = compression_algorithm;
-   dir_data->compression_level = compression_level;
-   dir_data->basedir = pg_strdup(basedir);
-   dir_data->sync = sync;
-
-   return method;
-}
-
-void
-FreeWalDirectoryMethod(void)
-{
-   pg_free(dir_data->basedir);
-   pg_free(dir_data);
-   dir_data = NULL;
+   DirectoryMethodData *wwmethod;
+
+   wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
+   *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+       &WalDirectoryMethodOps;
+   wwmethod->base.compression_algorithm = compression_algorithm;
+   wwmethod->base.compression_level = compression_level;
+   wwmethod->base.sync = sync;
+   clear_error(&wwmethod->base);
+   wwmethod->basedir = pg_strdup(basedir);
+
+   return &wwmethod->base;
 }
 
 
@@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void)
  *-------------------------------------------------------------------------
  */
 
+static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
+                                  const char *pathname,
+                                  const char *temp_suffix,
+                                  size_t pad_to_size);
+static int tar_close(Walfile *f, WalCloseMethod method);
+static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
+                                const char *pathname);
+static char *tar_get_file_name(WalWriteMethod *wwmethod,
+                              const char *pathname, const char *temp_suffix);
+static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
+static int tar_sync(Walfile *f);
+static bool tar_finish(WalWriteMethod *wwmethod);
+static void tar_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalTarMethodOps = {
+   .open_for_write = tar_open_for_write,
+   .close = tar_close,
+   .existsfile = tar_existsfile,
+   .get_file_size = tar_get_file_size,
+   .get_file_name = tar_get_file_name,
+   .write = tar_write,
+   .sync = tar_sync,
+   .finish = tar_finish,
+   .free = tar_free
+};
+
 typedef struct TarMethodFile
 {
    Walfile     base;
@@ -661,37 +694,20 @@ typedef struct TarMethodFile
 
 typedef struct TarMethodData
 {
+   WalWriteMethod  base;
    char       *tarfilename;
    int         fd;
-   pg_compress_algorithm compression_algorithm;
-   int         compression_level;
-   bool        sync;
    TarMethodFile *currentfile;
-   const char *lasterrstring;  /* if set, takes precedence over lasterrno */
-   int         lasterrno;
 #ifdef HAVE_LIBZ
    z_streamp   zp;
    void       *zlibOut;
 #endif
 } TarMethodData;
-static TarMethodData *tar_data = NULL;
-
-#define tar_clear_error() \
-   (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
-#define tar_set_error(msg) \
-   (tar_data->lasterrstring = _(msg))
-
-static const char *
-tar_getlasterror(void)
-{
-   if (tar_data->lasterrstring)
-       return tar_data->lasterrstring;
-   return strerror(tar_data->lasterrno);
-}
 
 #ifdef HAVE_LIBZ
 static bool
-tar_write_compressed_data(void *buf, size_t count, bool flush)
+tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
+                         bool flush)
 {
    tar_data->zp->next_in = buf;
    tar_data->zp->avail_in = count;
@@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
        r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
        if (r == Z_STREAM_ERROR)
        {
-           tar_set_error("could not compress data");
+           tar_data->base.lasterrstring = "could not compress data";
            return false;
        }
 
@@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
            if (write(tar_data->fd, tar_data->zlibOut, len) != len)
            {
                /* If write didn't set errno, assume problem is no disk space */
-               tar_data->lasterrno = errno ? errno : ENOSPC;
+               tar_data->base.lasterrno = errno ? errno : ENOSPC;
                return false;
            }
 
@@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
        /* Reset the stream for writing */
        if (deflateReset(tar_data->zp) != Z_OK)
        {
-           tar_set_error("could not reset compression stream");
+           tar_data->base.lasterrstring = "could not reset compression stream";
            return false;
        }
    }
@@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
 static ssize_t
 tar_write(Walfile *f, const void *buf, size_t count)
 {
+   TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
    ssize_t     r;
 
    Assert(f != NULL);
-   tar_clear_error();
+   clear_error(f->wwmethod);
 
    /* Tarfile will always be positioned at the end */
-   if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
    {
        errno = 0;
        r = write(tar_data->fd, buf, count);
        if (r != count)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           tar_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
            return -1;
        }
        f->currpos += r;
        return r;
    }
 #ifdef HAVE_LIBZ
-   else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
-       if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+       if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
+                                      count, false))
            return -1;
        f->currpos += count;
        return count;
@@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count)
    else
    {
        /* Can't happen - compression enabled with no method set */
-       tar_data->lasterrno = ENOSYS;
+       f->wwmethod->lasterrno = ENOSYS;
        return -1;
    }
 }
@@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
 }
 
 static char *
-tar_get_file_name(const char *pathname, const char *temp_suffix)
+tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
+                 const char *temp_suffix)
 {
    char       *filename = pg_malloc0(MAXPGPATH * sizeof(char));
 
@@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix)
 }
 
 static Walfile *
-tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+                  const char *temp_suffix, size_t pad_to_size)
 {
+   TarMethodData *tar_data = (TarMethodData *) wwmethod;
    char       *tmppath;
 
-   tar_clear_error();
+   clear_error(wwmethod);
 
    if (tar_data->fd < 0)
    {
@@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                            pg_file_create_mode);
        if (tar_data->fd < 0)
        {
-           tar_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            return NULL;
        }
 
 #ifdef HAVE_LIBZ
-       if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+       if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
        {
            tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
            tar_data->zp->zalloc = Z_NULL;
@@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
             * default 15 for the windowBits parameter makes the output be
             * gzip instead of zlib.
             */
-           if (deflateInit2(tar_data->zp, tar_data->compression_level,
+           if (deflateInit2(tar_data->zp, wwmethod->compression_level,
                             Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
            {
                pg_free(tar_data->zp);
                tar_data->zp = NULL;
-               tar_set_error("could not initialize compression library");
+               wwmethod->lasterrstring =
+                   "could not initialize compression library";
                return NULL;
            }
        }
@@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
    if (tar_data->currentfile != NULL)
    {
-       tar_set_error("implementation error: tar files can't have more than one open file");
+       wwmethod->lasterrstring =
+           "implementation error: tar files can't have more than one open file";
        return NULL;
    }
 
    tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+   tar_data->currentfile->base.wwmethod = wwmethod;
 
-   tmppath = tar_get_file_name(pathname, temp_suffix);
+   tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
 
    /* Create a header with size set to 0 - we will fill out the size on close */
    if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
@@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
        pg_free(tar_data->currentfile);
        pg_free(tmppath);
        tar_data->currentfile = NULL;
-       tar_set_error("could not create tar header");
+       wwmethod->lasterrstring = "could not create tar header";
        return NULL;
    }
 
    pg_free(tmppath);
 
 #ifdef HAVE_LIBZ
-   if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        /* Flush existing data */
-       if (!tar_write_compressed_data(NULL, 0, true))
+       if (!tar_write_compressed_data(tar_data, NULL, 0, true))
            return NULL;
 
        /* Turn off compression for header */
        if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
        {
-           tar_set_error("could not change compression parameters");
+           wwmethod->lasterrstring =
+               "could not change compression parameters";
            return NULL;
        }
    }
@@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
    tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
    if (tar_data->currentfile->ofs_start == -1)
    {
-       tar_data->lasterrno = errno;
+       wwmethod->lasterrno = errno;
        pg_free(tar_data->currentfile);
        tar_data->currentfile = NULL;
        return NULL;
    }
    tar_data->currentfile->base.currpos = 0;
 
-   if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
    {
        errno = 0;
        if (write(tar_data->fd, tar_data->currentfile->header,
                  TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           tar_data->lasterrno = errno ? errno : ENOSPC;
+           wwmethod->lasterrno = errno ? errno : ENOSPC;
            pg_free(tar_data->currentfile);
            tar_data->currentfile = NULL;
            return NULL;
        }
    }
 #ifdef HAVE_LIBZ
-   else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        /* Write header through the zlib APIs but with no compression */
-       if (!tar_write_compressed_data(tar_data->currentfile->header,
+       if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
                                       TAR_BLOCK_SIZE, true))
            return NULL;
 
        /* Re-enable compression for the rest of the file */
-       if (deflateParams(tar_data->zp, tar_data->compression_level,
+       if (deflateParams(tar_data->zp, wwmethod->compression_level,
                          Z_DEFAULT_STRATEGY) != Z_OK)
        {
-           tar_set_error("could not change compression parameters");
+           wwmethod->lasterrstring = "could not change compression parameters";
            return NULL;
        }
    }
@@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
    if (pad_to_size)
    {
        tar_data->currentfile->pad_to_size = pad_to_size;
-       if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+       if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
        {
            /* Uncompressed, so pad now */
            if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
@@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                      tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
                      SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
            {
-               tar_data->lasterrno = errno;
+               wwmethod->lasterrno = errno;
                return NULL;
            }
 
@@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 }
 
 static ssize_t
-tar_get_file_size(const char *pathname)
+tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
 {
-   tar_clear_error();
+   clear_error(wwmethod);
 
    /* Currently not used, so not supported */
-   tar_data->lasterrno = ENOSYS;
+   wwmethod->lasterrno = ENOSYS;
    return -1;
 }
 
-static pg_compress_algorithm
-tar_compression_algorithm(void)
-{
-   return tar_data->compression_algorithm;
-}
-
 static int
 tar_sync(Walfile *f)
 {
+   TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
    int         r;
 
    Assert(f != NULL);
-   tar_clear_error();
+   clear_error(f->wwmethod);
 
-   if (!tar_data->sync)
+   if (!f->wwmethod->sync)
        return 0;
 
    /*
     * Always sync the whole tarfile, because that's all we can do. This makes
     * no sense on compressed files, so just ignore those.
     */
-   if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+   if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
        return 0;
 
    r = fsync(tar_data->fd);
    if (r < 0)
-       tar_data->lasterrno = errno;
+       f->wwmethod->lasterrno = errno;
    return r;
 }
 
@@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method)
 {
    ssize_t     filesize;
    int         padding;
+   TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
    TarMethodFile *tf = (TarMethodFile *) f;
 
    Assert(f != NULL);
-   tar_clear_error();
+   clear_error(f->wwmethod);
 
    if (method == CLOSE_UNLINK)
    {
-       if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+       if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
        {
-           tar_set_error("unlink not supported with compression");
+           f->wwmethod->lasterrstring = "unlink not supported with compression";
            return -1;
        }
 
@@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method)
         */
        if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
        {
-           tar_data->lasterrno = errno;
+           f->wwmethod->lasterrno = errno;
            return -1;
        }
 
@@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method)
     */
    if (tf->pad_to_size)
    {
-       if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+       if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
        {
            /*
             * A compressed tarfile is padded on close since we cannot know
@@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method)
 
 
 #ifdef HAVE_LIBZ
-   if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        /* Flush the current buffer */
-       if (!tar_write_compressed_data(NULL, 0, true))
+       if (!tar_write_compressed_data(tar_data, NULL, 0, true))
            return -1;
    }
 #endif
@@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method)
    print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
    if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
    {
-       tar_data->lasterrno = errno;
+       f->wwmethod->lasterrno = errno;
        return -1;
    }
-   if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+   if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
    {
        errno = 0;
        if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
        {
            /* If write didn't set errno, assume problem is no disk space */
-           tar_data->lasterrno = errno ? errno : ENOSPC;
+           f->wwmethod->lasterrno = errno ? errno : ENOSPC;
            return -1;
        }
    }
 #ifdef HAVE_LIBZ
-   else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
        /* Turn off compression */
        if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
        {
-           tar_set_error("could not change compression parameters");
+           f->wwmethod->lasterrstring = "could not change compression parameters";
            return -1;
        }
 
        /* Overwrite the header, assuming the size will be the same */
-       if (!tar_write_compressed_data(tar_data->currentfile->header,
+       if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
                                       TAR_BLOCK_SIZE, true))
            return -1;
 
        /* Turn compression back on */
-       if (deflateParams(tar_data->zp, tar_data->compression_level,
+       if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
                          Z_DEFAULT_STRATEGY) != Z_OK)
        {
-           tar_set_error("could not change compression parameters");
+           f->wwmethod->lasterrstring = "could not change compression parameters";
            return -1;
        }
    }
@@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method)
    /* Move file pointer back down to end, so we can write the next file */
    if (lseek(tar_data->fd, 0, SEEK_END) < 0)
    {
-       tar_data->lasterrno = errno;
+       f->wwmethod->lasterrno = errno;
        return -1;
    }
 
@@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method)
    {
        /* XXX this seems pretty bogus; why is only this case fatal? */
        pg_fatal("could not fsync file \"%s\": %s",
-                tf->base.pathname, tar_getlasterror());
+                tf->base.pathname, GetLastWalMethodError(f->wwmethod));
    }
 
    /* Clean up and done */
@@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method)
 }
 
 static bool
-tar_existsfile(const char *pathname)
+tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
 {
-   tar_clear_error();
+   clear_error(wwmethod);
    /* We only deal with new tarfiles, so nothing externally created exists */
    return false;
 }
 
 static bool
-tar_finish(void)
+tar_finish(WalWriteMethod *wwmethod)
 {
+   TarMethodData *tar_data = (TarMethodData *) wwmethod;
    char        zerobuf[1024] = {0};
 
-   tar_clear_error();
+   clear_error(wwmethod);
 
    if (tar_data->currentfile)
    {
@@ -1212,20 +1234,21 @@ tar_finish(void)
    }
 
    /* A tarfile always ends with two empty blocks */
-   if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
    {
        errno = 0;
        if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
        {
            /* If write didn't set errno, assume problem is no disk space */
-           tar_data->lasterrno = errno ? errno : ENOSPC;
+           wwmethod->lasterrno = errno ? errno : ENOSPC;
            return false;
        }
    }
 #ifdef HAVE_LIBZ
-   else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+   else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
    {
-       if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+       if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
+                                      false))
            return false;
 
        /* Also flush all data to make sure the gzip stream is finished */
@@ -1239,7 +1262,7 @@ tar_finish(void)
 
            if (r == Z_STREAM_ERROR)
            {
-               tar_set_error("could not compress data");
+               wwmethod->lasterrstring = "could not compress data";
                return false;
            }
            if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
@@ -1253,7 +1276,7 @@ tar_finish(void)
                     * If write didn't set errno, assume problem is no disk
                     * space.
                     */
-                   tar_data->lasterrno = errno ? errno : ENOSPC;
+                   wwmethod->lasterrno = errno ? errno : ENOSPC;
                    return false;
                }
            }
@@ -1263,7 +1286,7 @@ tar_finish(void)
 
        if (deflateEnd(tar_data->zp) != Z_OK)
        {
-           tar_set_error("could not close compression stream");
+           wwmethod->lasterrstring = "could not close compression stream";
            return false;
        }
    }
@@ -1275,29 +1298,29 @@ tar_finish(void)
    }
 
    /* sync the empty blocks as well, since they're after the last file */
-   if (tar_data->sync)
+   if (wwmethod->sync)
    {
        if (fsync(tar_data->fd) != 0)
        {
-           tar_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            return false;
        }
    }
 
    if (close(tar_data->fd) != 0)
    {
-       tar_data->lasterrno = errno;
+       wwmethod->lasterrno = errno;
        return false;
    }
 
    tar_data->fd = -1;
 
-   if (tar_data->sync)
+   if (wwmethod->sync)
    {
        if (fsync_fname(tar_data->tarfilename, false) != 0 ||
            fsync_parent_path(tar_data->tarfilename) != 0)
        {
-           tar_data->lasterrno = errno;
+           wwmethod->lasterrno = errno;
            return false;
        }
    }
@@ -1305,6 +1328,19 @@ tar_finish(void)
    return true;
 }
 
+static void
+tar_free(WalWriteMethod *wwmethod)
+{
+   TarMethodData *tar_data = (TarMethodData *) wwmethod;
+
+   pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+   if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
+       pg_free(tar_data->zlibOut);
+#endif
+   pg_free(wwmethod);
+}
+
 /*
  * The argument compression_algorithm is currently ignored. It is in place for
  * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
@@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase,
                   pg_compress_algorithm compression_algorithm,
                   int compression_level, bool sync)
 {
-   WalWriteMethod *method;
+   TarMethodData *wwmethod;
    const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
    ".tar.gz" : ".tar";
 
-   method = pg_malloc0(sizeof(WalWriteMethod));
-   method->open_for_write = tar_open_for_write;
-   method->write = tar_write;
-   method->get_file_size = tar_get_file_size;
-   method->get_file_name = tar_get_file_name;
-   method->compression_algorithm = tar_compression_algorithm;
-   method->close = tar_close;
-   method->sync = tar_sync;
-   method->existsfile = tar_existsfile;
-   method->finish = tar_finish;
-   method->getlasterror = tar_getlasterror;
-
-   tar_data = pg_malloc0(sizeof(TarMethodData));
-   tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
-   sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
-   tar_data->fd = -1;
-   tar_data->compression_algorithm = compression_algorithm;
-   tar_data->compression_level = compression_level;
-   tar_data->sync = sync;
+   wwmethod = pg_malloc0(sizeof(TarMethodData));
+   *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+       &WalTarMethodOps;
+   wwmethod->base.compression_algorithm = compression_algorithm;
+   wwmethod->base.compression_level = compression_level;
+   wwmethod->base.sync = sync;
+   clear_error(&wwmethod->base);
+
+   wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+   sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
+   wwmethod->fd = -1;
 #ifdef HAVE_LIBZ
    if (compression_algorithm == PG_COMPRESSION_GZIP)
-       tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+       wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
 #endif
 
-   return method;
+   return &wwmethod->base;
 }
 
-void
-FreeWalTarMethod(void)
+const char *
+GetLastWalMethodError(WalWriteMethod *wwmethod)
 {
-   pg_free(tar_data->tarfilename);
-#ifdef HAVE_LIBZ
-   if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
-       pg_free(tar_data->zlibOut);
-#endif
-   pg_free(tar_data);
-   tar_data = NULL;
+   if (wwmethod->lasterrstring)
+       return wwmethod->lasterrstring;
+   return strerror(wwmethod->lasterrno);
 }
index cf5ed87fbe8a50e190f77071f5dda903cd37f7cc..0d7728d0864f4c5dacac43cd521510774368de41 100644 (file)
@@ -16,6 +16,7 @@ typedef struct WalWriteMethod WalWriteMethod;
 
 typedef struct
 {
+   WalWriteMethod *wwmethod;
    off_t       currpos;
    char       *pathname;
    /*
@@ -34,16 +35,9 @@ typedef enum
 } WalCloseMethod;
 
 /*
- * A WalWriteMethod structure represents the different methods used
- * to write the streaming WAL as it's received.
- *
- * All methods that have a failure return indicator will set state
- * allowing the getlasterror() method to return a suitable message.
- * Commonly, errno is this state (or part of it); so callers must take
- * care not to clobber errno between a failed method call and use of
- * getlasterror() to retrieve the message.
+ * Table of callbacks for a WalWriteMethod.
  */
-struct WalWriteMethod
+typedef struct WalWriteMethodOps
 {
    /*
     * Open a target file. Returns Walfile, or NULL if open failed. If a temp
@@ -51,7 +45,7 @@ struct WalWriteMethod
     * automatically renamed in close(). If pad_to_size is specified, the file
     * will be padded with NUL up to that size, if supported by the Walmethod.
     */
-   Walfile    *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+   Walfile    *(*open_for_write) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size);
 
    /*
     * Close an open Walfile, using one or more methods for handling automatic
@@ -60,19 +54,16 @@ struct WalWriteMethod
    int         (*close) (Walfile *f, WalCloseMethod method);
 
    /* Check if a file exist */
-   bool        (*existsfile) (const char *pathname);
+   bool        (*existsfile) (WalWriteMethod *wwmethod, const char *pathname);
 
    /* Return the size of a file, or -1 on failure. */
-   ssize_t     (*get_file_size) (const char *pathname);
+   ssize_t     (*get_file_size) (WalWriteMethod *wwmethod, const char *pathname);
 
    /*
     * Return the name of the current file to work on in pg_malloc()'d string,
     * without the base directory.  This is useful for logging.
     */
-   char       *(*get_file_name) (const char *pathname, const char *temp_suffix);
-
-   /* Returns the compression method */
-   pg_compress_algorithm (*compression_algorithm) (void);
+   char       *(*get_file_name) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix);
 
    /*
     * Write count number of bytes to the file, and return the number of bytes
@@ -91,10 +82,37 @@ struct WalWriteMethod
     * close/write/sync of shared resources succeeded, otherwise returns false
     * (but the resources are still closed).
     */
-   bool        (*finish) (void);
+   bool        (*finish) (WalWriteMethod *wwmethod);
 
-   /* Return a text for the last error in this Walfile */
-   const char *(*getlasterror) (void);
+   /*
+    * Free subsidiary data associated with the WalWriteMethod, and the
+    * WalWriteMethod itself.
+    */
+   void        (*free) (WalWriteMethod *wwmethod);
+} WalWriteMethodOps;
+
+/*
+ * A WalWriteMethod structure represents a way of writing streaming WAL as
+ * it's received.
+ *
+ * All methods that have a failure return indicator will set lasterrstring
+ * or lasterrno (the former takes precedence) so that the caller can signal
+ * a suitable error.
+ */
+struct WalWriteMethod
+{
+   const WalWriteMethodOps *ops;
+   pg_compress_algorithm compression_algorithm;
+   int         compression_level;
+   bool        sync;
+   const char *lasterrstring;  /* if set, takes precedence over lasterrno */
+   int         lasterrno;
+   /*
+    * MORE DATA FOLLOWS AT END OF STRUCT
+    *
+    * Each WalWriteMethod is expected to embed this as the first member of
+    * a larger struct with method-specific fields following.
+    */
 };
 
 /*
@@ -111,6 +129,4 @@ WalWriteMethod *CreateWalTarMethod(const char *tarbase,
                                   pg_compress_algorithm compression_algo,
                                   int compression, bool sync);
 
-/* Cleanup routines for previously-created methods */
-void       FreeWalDirectoryMethod(void);
-void       FreeWalTarMethod(void);
+const char *GetLastWalMethodError(WalWriteMethod *wwmethod);