walmethods.c/h: Make Walfile a struct, rather than a void *
authorRobert Haas <rhaas@postgresql.org>
Mon, 19 Sep 2022 15:20:18 +0000 (11:20 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 19 Sep 2022 15:20:18 +0000 (11:20 -0400)
This makes the curent file position and pathname visible in a generic
way, so we no longer need current_walfile_name global variable or the
get_current_pos() method. Since that purported to be able to fail but
never actually did, this also lets us get rid of some unnecessary
error-handling code.

One risk of this change is that the get_current_pos() method
previously cleared the error indicator, and that will no longer happen
with the new approach. I looked for a way that this could cause problems
and did not find one.

The previous code was confused about whether "Walfile" was the
implementation-dependent structure representing a WAL file or
whether it was a pointer to that stucture. Some of the code used it
one way, and some in the other. The compiler tolerated that because
void * is interchangeable with void **, but now that Walfile is a
struct, it's necessary to be consistent. Hence, some references to
"Walfile" have been converted to "Walfile *".

Discussion: http://postgr.es/m/CA+TgmoZS0Kw98fOoAcGz8B9iDhdqB4Be4e=vDZaJZ5A-xMYBqA@mail.gmail.com

src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/walmethods.c
src/bin/pg_basebackup/walmethods.h

index 5f6fd3201f38926163595a1ccef0264a3d3bb71b..a619176511fb77b8c120138da5fdcb5aba5f4d2b 100644 (file)
@@ -25,9 +25,8 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
-/* fd and filename for currently open WAL file */
+/* currently open WAL file */
 static Walfile *walfile = NULL;
-static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
@@ -82,8 +81,7 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
  * Open a new WAL file in the specified directory.
  *
  * Returns true if OK; on failure, returns false after printing an error msg.
- * On success, 'walfile' is set to the FD for the file, and the base filename
- * (without partial_suffix) is stored in 'current_walfile_name'.
+ * On success, 'walfile' is set to the opened WAL file.
  *
  * The file will be padded to 16Mb with zeroes.
  */
@@ -94,12 +92,13 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
        char       *fn;
        ssize_t         size;
        XLogSegNo       segno;
+       char            walfile_name[MAXPGPATH];
 
        XLByteToSeg(startpoint, segno, WalSegSz);
-       XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
+       XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
 
        /* Note that this considers the compression used if necessary */
-       fn = stream->walmethod->get_file_name(current_walfile_name,
+       fn = stream->walmethod->get_file_name(walfile_name,
                                                                                  stream->partial_suffix);
 
        /*
@@ -126,7 +125,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
                if (size == WalSegSz)
                {
                        /* Already padded file. Open it for use */
-                       f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+                       f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0);
                        if (f == NULL)
                        {
                                pg_log_error("could not open existing write-ahead log file \"%s\": %s",
@@ -165,7 +164,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 
        /* No file existed, so create one */
 
-       f = stream->walmethod->open_for_write(current_walfile_name,
+       f = stream->walmethod->open_for_write(walfile_name,
                                                                                  stream->partial_suffix, WalSegSz);
        if (f == NULL)
        {
@@ -191,27 +190,18 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
        char       *fn;
        off_t           currpos;
        int                     r;
+       char            walfile_name[MAXPGPATH];
 
        if (walfile == NULL)
                return true;
 
+       strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
+       currpos = walfile->currpos;
+
        /* Note that this considers the compression used if necessary */
-       fn = stream->walmethod->get_file_name(current_walfile_name,
+       fn = stream->walmethod->get_file_name(walfile_name,
                                                                                  stream->partial_suffix);
 
-       currpos = stream->walmethod->get_current_pos(walfile);
-
-       if (currpos == -1)
-       {
-               pg_log_error("could not determine seek position in file \"%s\": %s",
-                                        fn, stream->walmethod->getlasterror());
-               stream->walmethod->close(walfile, CLOSE_UNLINK);
-               walfile = NULL;
-
-               pg_free(fn);
-               return false;
-       }
-
        if (stream->partial_suffix)
        {
                if (currpos == WalSegSz)
@@ -247,7 +237,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
        if (currpos == WalSegSz && stream->mark_done)
        {
                /* writes error message if failed */
-               if (!mark_file_as_archived(stream, current_walfile_name))
+               if (!mark_file_as_archived(stream, walfile_name))
                        return false;
        }
 
@@ -690,7 +680,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 error:
        if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
                pg_log_error("could not close file \"%s\": %s",
-                                        current_walfile_name, stream->walmethod->getlasterror());
+                                        walfile->pathname, stream->walmethod->getlasterror());
        walfile = NULL;
        return false;
 }
@@ -777,7 +767,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
                {
                        if (stream->walmethod->sync(walfile) != 0)
                                pg_fatal("could not fsync file \"%s\": %s",
-                                                current_walfile_name, stream->walmethod->getlasterror());
+                                                walfile->pathname, stream->walmethod->getlasterror());
                        lastFlushPosition = blockpos;
 
                        /*
@@ -1024,7 +1014,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                         */
                        if (stream->walmethod->sync(walfile) != 0)
                                pg_fatal("could not fsync file \"%s\": %s",
-                                                current_walfile_name, stream->walmethod->getlasterror());
+                                                walfile->pathname, stream->walmethod->getlasterror());
                        lastFlushPosition = blockpos;
                }
 
@@ -1092,10 +1082,10 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
        else
        {
                /* More data in existing segment */
-               if (stream->walmethod->get_current_pos(walfile) != xlogoff)
+               if (walfile->currpos != xlogoff)
                {
                        pg_log_error("got WAL data offset %08x, expected %08x",
-                                                xlogoff, (int) stream->walmethod->get_current_pos(walfile));
+                                                xlogoff, (int) walfile->currpos);
                        return false;
                }
        }
@@ -1129,7 +1119,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                                                                         bytes_to_write) != bytes_to_write)
                {
                        pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
-                                                bytes_to_write, current_walfile_name,
+                                                bytes_to_write, walfile->pathname,
                                                 stream->walmethod->getlasterror());
                        return false;
                }
index 4d4bc63fbe71e47fb4681cf3d7784582aac27219..d98a2681b9012b4860d3fd69fe0c7e93d23972bb 100644 (file)
@@ -62,9 +62,8 @@ static DirectoryMethodData *dir_data = NULL;
  */
 typedef struct DirectoryMethodFile
 {
+       Walfile         base;
        int                     fd;
-       off_t           currpos;
-       char       *pathname;
        char       *fullpath;
        char       *temp_suffix;
 #ifdef HAVE_LIBZ
@@ -104,7 +103,7 @@ dir_get_file_name(const char *pathname, const char *temp_suffix)
        return filename;
 }
 
-static Walfile
+static Walfile *
 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
 {
        char            tmppath[MAXPGPATH];
@@ -279,18 +278,18 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
        }
 #endif
 
+       f->base.currpos = 0;
+       f->base.pathname = pg_strdup(pathname);
        f->fd = fd;
-       f->currpos = 0;
-       f->pathname = pg_strdup(pathname);
        f->fullpath = pg_strdup(tmppath);
        if (temp_suffix)
                f->temp_suffix = pg_strdup(temp_suffix);
 
-       return f;
+       return &f->base;
 }
 
 static ssize_t
-dir_write(Walfile f, const void *buf, size_t count)
+dir_write(Walfile *f, const void *buf, size_t count)
 {
        ssize_t         r;
        DirectoryMethodFile *df = (DirectoryMethodFile *) f;
@@ -366,22 +365,12 @@ dir_write(Walfile f, const void *buf, size_t count)
                }
        }
        if (r > 0)
-               df->currpos += r;
+               df->base.currpos += r;
        return r;
 }
 
-static off_t
-dir_get_current_pos(Walfile f)
-{
-       Assert(f != NULL);
-       dir_clear_error();
-
-       /* Use a cached value to prevent lots of reseeks */
-       return ((DirectoryMethodFile *) f)->currpos;
-}
-
 static int
-dir_close(Walfile f, WalCloseMethod method)
+dir_close(Walfile *f, WalCloseMethod method)
 {
        int                     r;
        DirectoryMethodFile *df = (DirectoryMethodFile *) f;
@@ -440,13 +429,13 @@ dir_close(Walfile f, WalCloseMethod method)
                         * If we have a temp prefix, normal operation is to rename the
                         * file.
                         */
-                       filename = dir_get_file_name(df->pathname, df->temp_suffix);
+                       filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
                        snprintf(tmppath, sizeof(tmppath), "%s/%s",
                                         dir_data->basedir, filename);
                        pg_free(filename);
 
                        /* permanent name, so no need for the prefix */
-                       filename2 = dir_get_file_name(df->pathname, NULL);
+                       filename2 = dir_get_file_name(df->base.pathname, NULL);
                        snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
                                         dir_data->basedir, filename2);
                        pg_free(filename2);
@@ -467,7 +456,7 @@ dir_close(Walfile f, WalCloseMethod method)
                        char       *filename;
 
                        /* Unlink the file once it's closed */
-                       filename = dir_get_file_name(df->pathname, df->temp_suffix);
+                       filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
                        snprintf(tmppath, sizeof(tmppath), "%s/%s",
                                         dir_data->basedir, filename);
                        pg_free(filename);
@@ -498,7 +487,7 @@ dir_close(Walfile f, WalCloseMethod method)
        LZ4F_freeCompressionContext(df->ctx);
 #endif
 
-       pg_free(df->pathname);
+       pg_free(df->base.pathname);
        pg_free(df->fullpath);
        pg_free(df->temp_suffix);
        pg_free(df);
@@ -507,7 +496,7 @@ dir_close(Walfile f, WalCloseMethod method)
 }
 
 static int
-dir_sync(Walfile f)
+dir_sync(Walfile *f)
 {
        int                     r;
 
@@ -630,7 +619,6 @@ CreateWalDirectoryMethod(const char *basedir,
        method = pg_malloc0(sizeof(WalWriteMethod));
        method->open_for_write = dir_open_for_write;
        method->write = dir_write;
-       method->get_current_pos = dir_get_current_pos;
        method->get_file_size = dir_get_file_size;
        method->get_file_name = dir_get_file_name;
        method->compression_algorithm = dir_compression_algorithm;
@@ -665,10 +653,9 @@ FreeWalDirectoryMethod(void)
 
 typedef struct TarMethodFile
 {
+       Walfile         base;
        off_t           ofs_start;              /* Where does the *header* for this file start */
-       off_t           currpos;
        char            header[TAR_BLOCK_SIZE];
-       char       *pathname;
        size_t          pad_to_size;
 } TarMethodFile;
 
@@ -755,7 +742,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
 #endif
 
 static ssize_t
-tar_write(Walfile f, const void *buf, size_t count)
+tar_write(Walfile *f, const void *buf, size_t count)
 {
        ssize_t         r;
 
@@ -773,7 +760,7 @@ tar_write(Walfile f, const void *buf, size_t count)
                        tar_data->lasterrno = errno ? errno : ENOSPC;
                        return -1;
                }
-               ((TarMethodFile *) f)->currpos += r;
+               f->currpos += r;
                return r;
        }
 #ifdef HAVE_LIBZ
@@ -781,7 +768,7 @@ tar_write(Walfile f, const void *buf, size_t count)
        {
                if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
                        return -1;
-               ((TarMethodFile *) f)->currpos += count;
+               f->currpos += count;
                return count;
        }
 #endif
@@ -803,7 +790,7 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
        while (bytesleft)
        {
                size_t          bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
-               ssize_t         r = tar_write(f, zerobuf.data, bytestowrite);
+               ssize_t         r = tar_write(&f->base, zerobuf.data, bytestowrite);
 
                if (r < 0)
                        return false;
@@ -824,7 +811,7 @@ tar_get_file_name(const char *pathname, const char *temp_suffix)
        return filename;
 }
 
-static Walfile
+static Walfile *
 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
 {
        char       *tmppath;
@@ -920,7 +907,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                tar_data->currentfile = NULL;
                return NULL;
        }
-       tar_data->currentfile->currpos = 0;
+       tar_data->currentfile->base.currpos = 0;
 
        if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
        {
@@ -958,7 +945,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                Assert(false);
        }
 
-       tar_data->currentfile->pathname = pg_strdup(pathname);
+       tar_data->currentfile->base.pathname = pg_strdup(pathname);
 
        /*
         * Uncompressed files are padded on creation, but for compression we can't
@@ -981,11 +968,11 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
                                return NULL;
                        }
 
-                       tar_data->currentfile->currpos = 0;
+                       tar_data->currentfile->base.currpos = 0;
                }
        }
 
-       return tar_data->currentfile;
+       return &tar_data->currentfile->base;
 }
 
 static ssize_t
@@ -1004,17 +991,8 @@ tar_compression_algorithm(void)
        return tar_data->compression_algorithm;
 }
 
-static off_t
-tar_get_current_pos(Walfile f)
-{
-       Assert(f != NULL);
-       tar_clear_error();
-
-       return ((TarMethodFile *) f)->currpos;
-}
-
 static int
-tar_sync(Walfile f)
+tar_sync(Walfile *f)
 {
        int                     r;
 
@@ -1038,7 +1016,7 @@ tar_sync(Walfile f)
 }
 
 static int
-tar_close(Walfile f, WalCloseMethod method)
+tar_close(Walfile *f, WalCloseMethod method)
 {
        ssize_t         filesize;
        int                     padding;
@@ -1066,7 +1044,7 @@ tar_close(Walfile f, WalCloseMethod method)
                        return -1;
                }
 
-               pg_free(tf->pathname);
+               pg_free(tf->base.pathname);
                pg_free(tf);
                tar_data->currentfile = NULL;
 
@@ -1086,7 +1064,7 @@ tar_close(Walfile f, WalCloseMethod method)
                         * A compressed tarfile is padded on close since we cannot know
                         * the size of the compressed output until the end.
                         */
-                       size_t          sizeleft = tf->pad_to_size - tf->currpos;
+                       size_t          sizeleft = tf->pad_to_size - tf->base.currpos;
 
                        if (sizeleft)
                        {
@@ -1100,7 +1078,7 @@ tar_close(Walfile f, WalCloseMethod method)
                         * An uncompressed tarfile was padded on creation, so just adjust
                         * the current position as if we seeked to the end.
                         */
-                       tf->currpos = tf->pad_to_size;
+                       tf->base.currpos = tf->pad_to_size;
                }
        }
 
@@ -1108,7 +1086,7 @@ tar_close(Walfile f, WalCloseMethod method)
         * Get the size of the file, and pad out to a multiple of the tar block
         * size.
         */
-       filesize = tar_get_current_pos(f);
+       filesize = f->currpos;
        padding = tarPaddingBytesRequired(filesize);
        if (padding)
        {
@@ -1141,7 +1119,7 @@ tar_close(Walfile f, WalCloseMethod method)
                 * We overwrite it with what it was before if we have no tempname,
                 * since we're going to write the buffer anyway.
                 */
-               strlcpy(&(tf->header[0]), tf->pathname, 100);
+               strlcpy(&(tf->header[0]), tf->base.pathname, 100);
 
        print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
        if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
@@ -1201,11 +1179,11 @@ tar_close(Walfile f, WalCloseMethod method)
        {
                /* XXX this seems pretty bogus; why is only this case fatal? */
                pg_fatal("could not fsync file \"%s\": %s",
-                                tf->pathname, tar_getlasterror());
+                                tf->base.pathname, tar_getlasterror());
        }
 
        /* Clean up and done */
-       pg_free(tf->pathname);
+       pg_free(tf->base.pathname);
        pg_free(tf);
        tar_data->currentfile = NULL;
 
@@ -1229,7 +1207,7 @@ tar_finish(void)
 
        if (tar_data->currentfile)
        {
-               if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+               if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
                        return false;
        }
 
@@ -1345,7 +1323,6 @@ CreateWalTarMethod(const char *tarbase,
        method = pg_malloc0(sizeof(WalWriteMethod));
        method->open_for_write = tar_open_for_write;
        method->write = tar_write;
-       method->get_current_pos = tar_get_current_pos;
        method->get_file_size = tar_get_file_size;
        method->get_file_name = tar_get_file_name;
        method->compression_algorithm = tar_compression_algorithm;
index 76530dc9419c5d811887971c4a533f0c1a609262..cf5ed87fbe8a50e190f77071f5dda903cd37f7cc 100644 (file)
 
 #include "common/compression.h"
 
-typedef void *Walfile;
+struct WalWriteMethod;
+typedef struct WalWriteMethod WalWriteMethod;
+
+typedef struct
+{
+       off_t           currpos;
+       char       *pathname;
+       /*
+        * MORE DATA FOLLOWS AT END OF STRUCT
+        *
+        * Each WalWriteMethod is expected to embed this as the first member of
+        * a larger struct with method-specific fields following.
+        */
+} Walfile;
 
 typedef enum
 {
@@ -30,7 +43,6 @@ typedef enum
  * care not to clobber errno between a failed method call and use of
  * getlasterror() to retrieve the message.
  */
-typedef struct WalWriteMethod WalWriteMethod;
 struct WalWriteMethod
 {
        /*
@@ -39,13 +51,13 @@ struct WalWriteMethod
         * automatically renamed in close(). If pad_to_size is specified, the file
         * will be padded with NUL up to that size, if supported by the Walmethod.
         */
-       Walfile         (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+       Walfile    *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
 
        /*
         * Close an open Walfile, using one or more methods for handling automatic
         * unlinking etc. Returns 0 on success, other values for error.
         */
-       int                     (*close) (Walfile f, WalCloseMethod method);
+       int                     (*close) (Walfile *f, WalCloseMethod method);
 
        /* Check if a file exist */
        bool            (*existsfile) (const char *pathname);
@@ -66,15 +78,12 @@ struct WalWriteMethod
         * Write count number of bytes to the file, and return the number of bytes
         * actually written or -1 for error.
         */
-       ssize_t         (*write) (Walfile f, const void *buf, size_t count);
-
-       /* Return the current position in a file or -1 on error */
-       off_t           (*get_current_pos) (Walfile f);
+       ssize_t         (*write) (Walfile *f, const void *buf, size_t count);
 
        /*
         * fsync the contents of the specified file. Returns 0 on success.
         */
-       int                     (*sync) (Walfile f);
+       int                     (*sync) (Walfile *f);
 
        /*
         * Clean up the Walmethod, closing any shared resources. For methods like