snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
fname);
- f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
+ NULL, 0);
if (f == NULL)
{
pg_log_error("could not create archive status file \"%s\": %s",
- tmppath, stream->walmethod->getlasterror());
+ tmppath, GetLastWalMethodError(stream->walmethod));
return false;
}
- if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+ if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
{
pg_log_error("could not close archive status file \"%s\": %s",
- tmppath, stream->walmethod->getlasterror());
+ tmppath, GetLastWalMethodError(stream->walmethod));
return false;
}
XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(walfile_name,
- stream->partial_suffix);
+ fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix);
/*
* When streaming to files, if an existing file exists we verify that it's
* When streaming to tar, no file with this name will exist before, so we
* never have to verify a size.
*/
- if (stream->walmethod->compression_algorithm() == PG_COMPRESSION_NONE &&
- stream->walmethod->existsfile(fn))
+ if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
+ stream->walmethod->ops->existsfile(stream->walmethod, fn))
{
- size = stream->walmethod->get_file_size(fn);
+ size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
if (size < 0)
{
pg_log_error("could not get size of write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
if (size == WalSegSz)
{
/* Already padded file. Open it for use */
- f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
if (f == NULL)
{
pg_log_error("could not open existing write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
/* fsync file in case of a previous crash */
- if (stream->walmethod->sync(f) != 0)
+ if (stream->walmethod->ops->sync(f) != 0)
{
pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
- stream->walmethod->close(f, CLOSE_UNLINK);
+ fn, GetLastWalMethodError(stream->walmethod));
+ stream->walmethod->ops->close(f, CLOSE_UNLINK);
exit(1);
}
/* No file existed, so create one */
- f = stream->walmethod->open_for_write(walfile_name,
- stream->partial_suffix, WalSegSz);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix,
+ WalSegSz);
if (f == NULL)
{
pg_log_error("could not open write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
currpos = walfile->currpos;
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(walfile_name,
- stream->partial_suffix);
+ fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix);
if (stream->partial_suffix)
{
if (currpos == WalSegSz)
- r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
else
{
pg_log_info("not renaming \"%s\", segment is not complete", fn);
- r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
}
}
else
- r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
walfile = NULL;
if (r != 0)
{
pg_log_error("could not close file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
TLHistoryFileName(histfname, stream->timeline);
- return stream->walmethod->existsfile(histfname);
+ return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
}
static bool
return false;
}
- f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod,
+ histfname, ".tmp", 0);
if (f == NULL)
{
pg_log_error("could not create timeline history file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
return false;
}
- if ((int) stream->walmethod->write(f, content, size) != size)
+ if ((int) stream->walmethod->ops->write(f, content, size) != size)
{
pg_log_error("could not write timeline history file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
/*
* If we fail to make the file, delete it to release disk space
*/
- stream->walmethod->close(f, CLOSE_UNLINK);
+ stream->walmethod->ops->close(f, CLOSE_UNLINK);
return false;
}
- if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+ if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
{
pg_log_error("could not close file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
return false;
}
}
error:
- if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
+ if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
pg_log_error("could not close file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
walfile = NULL;
return false;
}
*/
if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{
- if (stream->walmethod->sync(walfile) != 0)
+ if (stream->walmethod->ops->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
lastFlushPosition = blockpos;
/*
* data has been successfully replicated or not, at the normal
* shutdown of the server.
*/
- if (stream->walmethod->sync(walfile) != 0)
+ if (stream->walmethod->ops->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
lastFlushPosition = blockpos;
}
}
}
- if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
- bytes_to_write) != bytes_to_write)
+ if (stream->walmethod->ops->write(walfile,
+ copybuf + hdr_len + bytes_written,
+ bytes_to_write) != bytes_to_write)
{
pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
bytes_to_write, walfile->pathname,
- stream->walmethod->getlasterror());
+ GetLastWalMethodError(stream->walmethod));
return false;
}
*
* walmethods.c - implementations of different ways to write received wal
*
- * NOTE! The caller must ensure that only one method is instantiated in
- * any given program, and that it's only instantiated once!
- *
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
*-------------------------------------------------------------------------
*/
+static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int dir_close(Walfile *f, WalCloseMethod method);
+static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
+static int dir_sync(Walfile *f);
+static bool dir_finish(WalWriteMethod *wwmethod);
+static void dir_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalDirectoryMethodOps = {
+ .open_for_write = dir_open_for_write,
+ .close = dir_close,
+ .existsfile = dir_existsfile,
+ .get_file_size = dir_get_file_size,
+ .get_file_name = dir_get_file_name,
+ .write = dir_write,
+ .sync = dir_sync,
+ .finish = dir_finish,
+ .free = dir_free
+};
+
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
+ WalWriteMethod base;
char *basedir;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
} DirectoryMethodData;
-static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
#endif
} DirectoryMethodFile;
-#define dir_clear_error() \
- (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
-#define dir_set_error(msg) \
- (dir_data->lasterrstring = _(msg))
-
-static const char *
-dir_getlasterror(void)
-{
- if (dir_data->lasterrstring)
- return dir_data->lasterrstring;
- return strerror(dir_data->lasterrno);
-}
+#define clear_error(wwmethod) \
+ ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
static char *
-dir_get_file_name(const char *pathname, const char *temp_suffix)
+dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
- dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+ wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+ wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
}
static Walfile *
-dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
char *filename;
int fd;
void *lz4buf = NULL;
#endif
- dir_clear_error();
+ clear_error(wwmethod);
- filename = dir_get_file_name(pathname, temp_suffix);
+ filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
if (fd < 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
gzfp = gzdopen(fd, "wb");
if (gzfp == NULL)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
- if (gzsetparams(gzfp, dir_data->compression_level,
+ if (gzsetparams(gzfp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
gzclose(gzfp);
return NULL;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t ctx_out;
size_t header_size;
ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(ctx_out))
{
- dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+ wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
close(fd);
return NULL;
}
/* assign the compression level, default is 0 */
memset(&prefs, 0, sizeof(prefs));
- prefs.compressionLevel = dir_data->compression_level;
+ prefs.compressionLevel = wwmethod->compression_level;
/* add the header */
header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
if (LZ4F_isError(header_size))
{
- dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+ wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
if (write(fd, lz4buf, header_size) != header_size)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
#endif
/* Do pre-padding on non-compressed files */
- if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
PGAlignedXLogBlock zerobuf;
int bytes;
if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
close(fd);
return NULL;
}
if (lseek(fd, 0, SEEK_SET) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
- if (dir_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tmppath, false) != 0 ||
fsync_parent_path(tmppath) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
gzclose(gzfp);
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
(void) LZ4F_freeCompressionContext(ctx);
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
f->ctx = ctx;
f->lz4buf = lz4buf;
}
#endif
+ f->base.wwmethod = wwmethod;
f->base.currpos = 0;
f->base.pathname = pg_strdup(pathname);
f->fd = fd;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0;
r = (ssize_t) gzwrite(df->gzfp, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t chunk;
size_t remaining;
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
if (r > 0)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0; /* in case gzclose() doesn't set it */
r = gzclose(df->gzfp);
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t compressed;
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
/* permanent name, so no need for the prefix */
- filename2 = dir_get_file_name(df->base.pathname, NULL);
+ filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, filename2);
pg_free(filename2);
- if (dir_data->sync)
+ if (f->wwmethod->sync)
r = durable_rename(tmppath, tmppath2);
else
{
char *filename;
/* Unlink the file once it's closed */
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
- if (dir_data->sync)
+ if (f->wwmethod->sync)
{
r = fsync_fname(df->fullpath, false);
if (r == 0)
}
if (r != 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
#ifdef USE_LZ4
pg_free(df->lz4buf);
int r;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
- if (!dir_data->sync)
+ if (!f->wwmethod->sync)
return 0;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
{
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
size_t compressed;
compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
r = fsync(((DirectoryMethodFile *) f)->fd);
if (r < 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
static ssize_t
-dir_get_file_size(const char *pathname)
+dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
struct stat statbuf;
char tmppath[MAXPGPATH];
if (stat(tmppath, &statbuf) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return -1;
}
return statbuf.st_size;
}
-static pg_compress_algorithm
-dir_compression_algorithm(void)
-{
- return dir_data->compression_algorithm;
-}
-
static bool
-dir_existsfile(const char *pathname)
+dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
int fd;
- dir_clear_error();
+ clear_error(wwmethod);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
}
static bool
-dir_finish(void)
+dir_finish(WalWriteMethod *wwmethod)
{
- dir_clear_error();
+ clear_error(wwmethod);
- if (dir_data->sync)
+ if (wwmethod->sync)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
return true;
}
+static void
+dir_free(WalWriteMethod *wwmethod)
+{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
+ pg_free(dir_data->basedir);
+ pg_free(wwmethod);
+}
+
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
-
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = dir_open_for_write;
- method->write = dir_write;
- method->get_file_size = dir_get_file_size;
- method->get_file_name = dir_get_file_name;
- method->compression_algorithm = dir_compression_algorithm;
- method->close = dir_close;
- method->sync = dir_sync;
- method->existsfile = dir_existsfile;
- method->finish = dir_finish;
- method->getlasterror = dir_getlasterror;
-
- dir_data = pg_malloc0(sizeof(DirectoryMethodData));
- dir_data->compression_algorithm = compression_algorithm;
- dir_data->compression_level = compression_level;
- dir_data->basedir = pg_strdup(basedir);
- dir_data->sync = sync;
-
- return method;
-}
-
-void
-FreeWalDirectoryMethod(void)
-{
- pg_free(dir_data->basedir);
- pg_free(dir_data);
- dir_data = NULL;
+ DirectoryMethodData *wwmethod;
+
+ wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalDirectoryMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+ wwmethod->basedir = pg_strdup(basedir);
+
+ return &wwmethod->base;
}
*-------------------------------------------------------------------------
*/
+static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int tar_close(Walfile *f, WalCloseMethod method);
+static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *tar_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
+static int tar_sync(Walfile *f);
+static bool tar_finish(WalWriteMethod *wwmethod);
+static void tar_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalTarMethodOps = {
+ .open_for_write = tar_open_for_write,
+ .close = tar_close,
+ .existsfile = tar_existsfile,
+ .get_file_size = tar_get_file_size,
+ .get_file_name = tar_get_file_name,
+ .write = tar_write,
+ .sync = tar_sync,
+ .finish = tar_finish,
+ .free = tar_free
+};
+
typedef struct TarMethodFile
{
Walfile base;
typedef struct TarMethodData
{
+ WalWriteMethod base;
char *tarfilename;
int fd;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
TarMethodFile *currentfile;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
#ifdef HAVE_LIBZ
z_streamp zp;
void *zlibOut;
#endif
} TarMethodData;
-static TarMethodData *tar_data = NULL;
-
-#define tar_clear_error() \
- (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
-#define tar_set_error(msg) \
- (tar_data->lasterrstring = _(msg))
-
-static const char *
-tar_getlasterror(void)
-{
- if (tar_data->lasterrstring)
- return tar_data->lasterrstring;
- return strerror(tar_data->lasterrno);
-}
#ifdef HAVE_LIBZ
static bool
-tar_write_compressed_data(void *buf, size_t count, bool flush)
+tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
+ bool flush)
{
tar_data->zp->next_in = buf;
tar_data->zp->avail_in = count;
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ tar_data->base.lasterrstring = "could not compress data";
return false;
}
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ tar_data->base.lasterrno = errno ? errno : ENOSPC;
return false;
}
/* Reset the stream for writing */
if (deflateReset(tar_data->zp) != Z_OK)
{
- tar_set_error("could not reset compression stream");
+ tar_data->base.lasterrstring = "could not reset compression stream";
return false;
}
}
static ssize_t
tar_write(Walfile *f, const void *buf, size_t count)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
ssize_t r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
/* Tarfile will always be positioned at the end */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
r = write(tar_data->fd, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
f->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+ if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
+ count, false))
return -1;
f->currpos += count;
return count;
else
{
/* Can't happen - compression enabled with no method set */
- tar_data->lasterrno = ENOSYS;
+ f->wwmethod->lasterrno = ENOSYS;
return -1;
}
}
}
static char *
-tar_get_file_name(const char *pathname, const char *temp_suffix)
+tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
}
static Walfile *
-tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char *tmppath;
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->fd < 0)
{
pg_file_create_mode);
if (tar_data->fd < 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
tar_data->zp->zalloc = Z_NULL;
* default 15 for the windowBits parameter makes the output be
* gzip instead of zlib.
*/
- if (deflateInit2(tar_data->zp, tar_data->compression_level,
+ if (deflateInit2(tar_data->zp, wwmethod->compression_level,
Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_free(tar_data->zp);
tar_data->zp = NULL;
- tar_set_error("could not initialize compression library");
+ wwmethod->lasterrstring =
+ "could not initialize compression library";
return NULL;
}
}
if (tar_data->currentfile != NULL)
{
- tar_set_error("implementation error: tar files can't have more than one open file");
+ wwmethod->lasterrstring =
+ "implementation error: tar files can't have more than one open file";
return NULL;
}
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+ tar_data->currentfile->base.wwmethod = wwmethod;
- tmppath = tar_get_file_name(pathname, temp_suffix);
+ tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
/* Create a header with size set to 0 - we will fill out the size on close */
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
pg_free(tar_data->currentfile);
pg_free(tmppath);
tar_data->currentfile = NULL;
- tar_set_error("could not create tar header");
+ wwmethod->lasterrstring = "could not create tar header";
return NULL;
}
pg_free(tmppath);
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush existing data */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return NULL;
/* Turn off compression for header */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring =
+ "could not change compression parameters";
return NULL;
}
}
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
if (tar_data->currentfile->ofs_start == -1)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
tar_data->currentfile->base.currpos = 0;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tar_data->currentfile->header,
TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Write header through the zlib APIs but with no compression */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return NULL;
/* Re-enable compression for the rest of the file */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring = "could not change compression parameters";
return NULL;
}
}
if (pad_to_size)
{
tar_data->currentfile->pad_to_size = pad_to_size;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
/* Uncompressed, so pad now */
if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
}
static ssize_t
-tar_get_file_size(const char *pathname)
+tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* Currently not used, so not supported */
- tar_data->lasterrno = ENOSYS;
+ wwmethod->lasterrno = ENOSYS;
return -1;
}
-static pg_compress_algorithm
-tar_compression_algorithm(void)
-{
- return tar_data->compression_algorithm;
-}
-
static int
tar_sync(Walfile *f)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
int r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
- if (!tar_data->sync)
+ if (!f->wwmethod->sync)
return 0;
/*
* Always sync the whole tarfile, because that's all we can do. This makes
* no sense on compressed files, so just ignore those.
*/
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
return 0;
r = fsync(tar_data->fd);
if (r < 0)
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
{
ssize_t filesize;
int padding;
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
TarMethodFile *tf = (TarMethodFile *) f;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
if (method == CLOSE_UNLINK)
{
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
{
- tar_set_error("unlink not supported with compression");
+ f->wwmethod->lasterrstring = "unlink not supported with compression";
return -1;
}
*/
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
*/
if (tf->pad_to_size)
{
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/*
* A compressed tarfile is padded on close since we cannot know
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush the current buffer */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return -1;
}
#endif
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Turn off compression */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
/* Overwrite the header, assuming the size will be the same */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return -1;
/* Turn compression back on */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
}
/* Move file pointer back down to end, so we can write the next file */
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
{
/* XXX this seems pretty bogus; why is only this case fatal? */
pg_fatal("could not fsync file \"%s\": %s",
- tf->base.pathname, tar_getlasterror());
+ tf->base.pathname, GetLastWalMethodError(f->wwmethod));
}
/* Clean up and done */
}
static bool
-tar_existsfile(const char *pathname)
+tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* We only deal with new tarfiles, so nothing externally created exists */
return false;
}
static bool
-tar_finish(void)
+tar_finish(WalWriteMethod *wwmethod)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char zerobuf[1024] = {0};
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->currentfile)
{
}
/* A tarfile always ends with two empty blocks */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
+ false))
return false;
/* Also flush all data to make sure the gzip stream is finished */
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ wwmethod->lasterrstring = "could not compress data";
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
* If write didn't set errno, assume problem is no disk
* space.
*/
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
if (deflateEnd(tar_data->zp) != Z_OK)
{
- tar_set_error("could not close compression stream");
+ wwmethod->lasterrstring = "could not close compression stream";
return false;
}
}
}
/* sync the empty blocks as well, since they're after the last file */
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
if (close(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
tar_data->fd = -1;
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tar_data->tarfilename, false) != 0 ||
fsync_parent_path(tar_data->tarfilename) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
return true;
}
+static void
+tar_free(WalWriteMethod *wwmethod)
+{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
+
+ pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
+ pg_free(tar_data->zlibOut);
+#endif
+ pg_free(wwmethod);
+}
+
/*
* The argument compression_algorithm is currently ignored. It is in place for
* symmetry with CreateWalDirectoryMethod which uses it for distinguishing
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
+ TarMethodData *wwmethod;
const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
".tar.gz" : ".tar";
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = tar_open_for_write;
- method->write = tar_write;
- method->get_file_size = tar_get_file_size;
- method->get_file_name = tar_get_file_name;
- method->compression_algorithm = tar_compression_algorithm;
- method->close = tar_close;
- method->sync = tar_sync;
- method->existsfile = tar_existsfile;
- method->finish = tar_finish;
- method->getlasterror = tar_getlasterror;
-
- tar_data = pg_malloc0(sizeof(TarMethodData));
- tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
- sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
- tar_data->fd = -1;
- tar_data->compression_algorithm = compression_algorithm;
- tar_data->compression_level = compression_level;
- tar_data->sync = sync;
+ wwmethod = pg_malloc0(sizeof(TarMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalTarMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+
+ wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
+ wwmethod->fd = -1;
#ifdef HAVE_LIBZ
if (compression_algorithm == PG_COMPRESSION_GZIP)
- tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+ wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
#endif
- return method;
+ return &wwmethod->base;
}
-void
-FreeWalTarMethod(void)
+const char *
+GetLastWalMethodError(WalWriteMethod *wwmethod)
{
- pg_free(tar_data->tarfilename);
-#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
- pg_free(tar_data->zlibOut);
-#endif
- pg_free(tar_data);
- tar_data = NULL;
+ if (wwmethod->lasterrstring)
+ return wwmethod->lasterrstring;
+ return strerror(wwmethod->lasterrno);
}