summaryrefslogtreecommitdiff
path: root/src/fe_utils
diff options
context:
space:
mode:
authorRobert Haas2024-08-05 15:40:29 +0000
committerRobert Haas2024-08-05 15:41:57 +0000
commitf80b09bac87d6b49f5dbb6131da5fbd9b9773c5c (patch)
tree353896ae8f4a860c269ec705c4f1becddf4f90f1 /src/fe_utils
parent53b2c921a0f9b56465ab65165c1909f9616ffa98 (diff)
Move astreamer (except astreamer_inject) to fe_utils.
This allows the code to be used by other frontend applications. Amul Sul, reviewed by Sravan Kumar, Andres Freund (whose input I specifically solicited regarding the meson.build changes), and me. Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com
Diffstat (limited to 'src/fe_utils')
-rw-r--r--src/fe_utils/Makefile5
-rw-r--r--src/fe_utils/astreamer_file.c396
-rw-r--r--src/fe_utils/astreamer_gzip.c364
-rw-r--r--src/fe_utils/astreamer_lz4.c422
-rw-r--r--src/fe_utils/astreamer_tar.c514
-rw-r--r--src/fe_utils/astreamer_zstd.c368
-rw-r--r--src/fe_utils/meson.build5
7 files changed, 2074 insertions, 0 deletions
diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index 946c05258f0..2694be4b859 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -21,6 +21,11 @@ override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = \
archive.o \
+ astreamer_file.o \
+ astreamer_gzip.o \
+ astreamer_lz4.o \
+ astreamer_tar.o \
+ astreamer_zstd.o \
cancel.o \
conditional.o \
connect_utils.o \
diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c
new file mode 100644
index 00000000000..13d1192c6e6
--- /dev/null
+++ b/src/fe_utils/astreamer_file.c
@@ -0,0 +1,396 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_file.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_file.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#include "common/file_perm.h"
+#include "common/logging.h"
+#include "common/string.h"
+#include "fe_utils/astreamer.h"
+
+typedef struct astreamer_plain_writer
+{
+ astreamer base;
+ char *pathname;
+ FILE *file;
+ bool should_close_file;
+} astreamer_plain_writer;
+
+typedef struct astreamer_extractor
+{
+ astreamer base;
+ char *basepath;
+ const char *(*link_map) (const char *);
+ void (*report_output_file) (const char *);
+ char filename[MAXPGPATH];
+ FILE *file;
+} astreamer_extractor;
+
+static void astreamer_plain_writer_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_plain_writer_finalize(astreamer *streamer);
+static void astreamer_plain_writer_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_plain_writer_ops = {
+ .content = astreamer_plain_writer_content,
+ .finalize = astreamer_plain_writer_finalize,
+ .free = astreamer_plain_writer_free
+};
+
+static void astreamer_extractor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_extractor_finalize(astreamer *streamer);
+static void astreamer_extractor_free(astreamer *streamer);
+static void extract_directory(const char *filename, mode_t mode);
+static void extract_link(const char *filename, const char *linktarget);
+static FILE *create_file_for_extract(const char *filename, mode_t mode);
+
+static const astreamer_ops astreamer_extractor_ops = {
+ .content = astreamer_extractor_content,
+ .finalize = astreamer_extractor_finalize,
+ .free = astreamer_extractor_free
+};
+
+/*
+ * Create a astreamer that just writes data to a file.
+ *
+ * The caller must specify a pathname and may specify a file. The pathname is
+ * used for error-reporting purposes either way. If file is NULL, the pathname
+ * also identifies the file to which the data should be written: it is opened
+ * for writing and closed when done. If file is not NULL, the data is written
+ * there.
+ */
+astreamer *
+astreamer_plain_writer_new(char *pathname, FILE *file)
+{
+ astreamer_plain_writer *streamer;
+
+ streamer = palloc0(sizeof(astreamer_plain_writer));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_plain_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+ streamer->file = file;
+
+ if (file == NULL)
+ {
+ streamer->file = fopen(pathname, "wb");
+ if (streamer->file == NULL)
+ pg_fatal("could not create file \"%s\": %m", pathname);
+ streamer->should_close_file = true;
+ }
+
+ return &streamer->base;
+}
+
+/*
+ * Write archive content to file.
+ */
+static void
+astreamer_plain_writer_content(astreamer *streamer,
+ astreamer_member *member, const char *data,
+ int len, astreamer_archive_context context)
+{
+ astreamer_plain_writer *mystreamer;
+
+ mystreamer = (astreamer_plain_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_fatal("could not write to file \"%s\": %m",
+ mystreamer->pathname);
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a plain file consists of closing
+ * the file if we opened it, but not if the caller provided it.
+ */
+static void
+astreamer_plain_writer_finalize(astreamer *streamer)
+{
+ astreamer_plain_writer *mystreamer;
+
+ mystreamer = (astreamer_plain_writer *) streamer;
+
+ if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
+ pg_fatal("could not close file \"%s\": %m",
+ mystreamer->pathname);
+
+ mystreamer->file = NULL;
+ mystreamer->should_close_file = false;
+}
+
+/*
+ * Free memory associated with this astreamer.
+ */
+static void
+astreamer_plain_writer_free(astreamer *streamer)
+{
+ astreamer_plain_writer *mystreamer;
+
+ mystreamer = (astreamer_plain_writer *) streamer;
+
+ Assert(!mystreamer->should_close_file);
+ Assert(mystreamer->base.bbs_next == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Create a astreamer that extracts an archive.
+ *
+ * All pathnames in the archive are interpreted relative to basepath.
+ *
+ * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here
+ * with untyped chunks; we need typed chunks which follow the rules described
+ * in astreamer.h. Assuming we have that, we don't need to worry about the
+ * original archive format; it's enough to just look at the member information
+ * provided and write to the corresponding file.
+ *
+ * 'link_map' is a function that will be applied to the target of any
+ * symbolic link, and which should return a replacement pathname to be used
+ * in its place. If NULL, the symbolic link target is used without
+ * modification.
+ *
+ * 'report_output_file' is a function that will be called each time we open a
+ * new output file. The pathname to that file is passed as an argument. If
+ * NULL, the call is skipped.
+ */
+astreamer *
+astreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *))
+{
+ astreamer_extractor *streamer;
+
+ streamer = palloc0(sizeof(astreamer_extractor));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_extractor_ops;
+ streamer->basepath = pstrdup(basepath);
+ streamer->link_map = link_map;
+ streamer->report_output_file = report_output_file;
+
+ return &streamer->base;
+}
+
+/*
+ * Extract archive contents to the filesystem.
+ */
+static void
+astreamer_extractor_content(astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
+ int fnamelen;
+
+ Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
+ Assert(context != ASTREAMER_UNKNOWN);
+
+ switch (context)
+ {
+ case ASTREAMER_MEMBER_HEADER:
+ Assert(mystreamer->file == NULL);
+
+ /* Prepend basepath. */
+ snprintf(mystreamer->filename, sizeof(mystreamer->filename),
+ "%s/%s", mystreamer->basepath, member->pathname);
+
+ /* Remove any trailing slash. */
+ fnamelen = strlen(mystreamer->filename);
+ if (mystreamer->filename[fnamelen - 1] == '/')
+ mystreamer->filename[fnamelen - 1] = '\0';
+
+ /* Dispatch based on file type. */
+ if (member->is_directory)
+ extract_directory(mystreamer->filename, member->mode);
+ else if (member->is_link)
+ {
+ const char *linktarget = member->linktarget;
+
+ if (mystreamer->link_map)
+ linktarget = mystreamer->link_map(linktarget);
+ extract_link(mystreamer->filename, linktarget);
+ }
+ else
+ mystreamer->file =
+ create_file_for_extract(mystreamer->filename,
+ member->mode);
+
+ /* Report output file change. */
+ if (mystreamer->report_output_file)
+ mystreamer->report_output_file(mystreamer->filename);
+ break;
+
+ case ASTREAMER_MEMBER_CONTENTS:
+ if (mystreamer->file == NULL)
+ break;
+
+ errno = 0;
+ if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_fatal("could not write to file \"%s\": %m",
+ mystreamer->filename);
+ }
+ break;
+
+ case ASTREAMER_MEMBER_TRAILER:
+ if (mystreamer->file == NULL)
+ break;
+ fclose(mystreamer->file);
+ mystreamer->file = NULL;
+ break;
+
+ case ASTREAMER_ARCHIVE_TRAILER:
+ break;
+
+ default:
+ /* Shouldn't happen. */
+ pg_fatal("unexpected state while extracting archive");
+ }
+}
+
+/*
+ * Should we tolerate an already-existing directory?
+ *
+ * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been
+ * created by the wal receiver process. Also, when the WAL directory location
+ * was specified, pg_wal (or pg_xlog) has already been created as a symbolic
+ * link before starting the actual backup. So just ignore creation failures
+ * on related directories.
+ *
+ * If in-place tablespaces are used, pg_tblspc and subdirectories may already
+ * exist when we get here. So tolerate that case, too.
+ */
+static bool
+should_allow_existing_directory(const char *pathname)
+{
+ const char *filename = last_dir_separator(pathname) + 1;
+
+ if (strcmp(filename, "pg_wal") == 0 ||
+ strcmp(filename, "pg_xlog") == 0 ||
+ strcmp(filename, "archive_status") == 0 ||
+ strcmp(filename, "summaries") == 0 ||
+ strcmp(filename, "pg_tblspc") == 0)
+ return true;
+
+ if (strspn(filename, "0123456789") == strlen(filename))
+ {
+ const char *pg_tblspc = strstr(pathname, "/pg_tblspc/");
+
+ return pg_tblspc != NULL && pg_tblspc + 11 == filename;
+ }
+
+ return false;
+}
+
+/*
+ * Create a directory.
+ */
+static void
+extract_directory(const char *filename, mode_t mode)
+{
+ if (mkdir(filename, pg_dir_create_mode) != 0 &&
+ (errno != EEXIST || !should_allow_existing_directory(filename)))
+ pg_fatal("could not create directory \"%s\": %m",
+ filename);
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ pg_fatal("could not set permissions on directory \"%s\": %m",
+ filename);
+#endif
+}
+
+/*
+ * Create a symbolic link.
+ *
+ * It's most likely a link in pg_tblspc directory, to the location of a
+ * tablespace. Apply any tablespace mapping given on the command line
+ * (--tablespace-mapping). (We blindly apply the mapping without checking that
+ * the link really is inside pg_tblspc. We don't expect there to be other
+ * symlinks in a data directory, but if there are, you can call it an
+ * undocumented feature that you can map them too.)
+ */
+static void
+extract_link(const char *filename, const char *linktarget)
+{
+ if (symlink(linktarget, filename) != 0)
+ pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m",
+ filename, linktarget);
+}
+
+/*
+ * Create a regular file.
+ *
+ * Return the resulting handle so we can write the content to the file.
+ */
+static FILE *
+create_file_for_extract(const char *filename, mode_t mode)
+{
+ FILE *file;
+
+ file = fopen(filename, "wb");
+ if (file == NULL)
+ pg_fatal("could not create file \"%s\": %m", filename);
+
+#ifndef WIN32
+ if (chmod(filename, mode))
+ pg_fatal("could not set permissions on file \"%s\": %m",
+ filename);
+#endif
+
+ return file;
+}
+
+/*
+ * End-of-stream processing for extracting an archive.
+ *
+ * There's nothing to do here but sanity checking.
+ */
+static void
+astreamer_extractor_finalize(astreamer *streamer)
+{
+ astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
+ = (astreamer_extractor *) streamer;
+
+ Assert(mystreamer->file == NULL);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_extractor_free(astreamer *streamer)
+{
+ astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
+
+ pfree(mystreamer->basepath);
+ pfree(mystreamer);
+}
diff --git a/src/fe_utils/astreamer_gzip.c b/src/fe_utils/astreamer_gzip.c
new file mode 100644
index 00000000000..dd28defac7b
--- /dev/null
+++ b/src/fe_utils/astreamer_gzip.c
@@ -0,0 +1,364 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_gzip.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_gzip.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "common/file_perm.h"
+#include "common/logging.h"
+#include "common/string.h"
+#include "fe_utils/astreamer.h"
+
+#ifdef HAVE_LIBZ
+typedef struct astreamer_gzip_writer
+{
+ astreamer base;
+ char *pathname;
+ gzFile gzfile;
+} astreamer_gzip_writer;
+
+typedef struct astreamer_gzip_decompressor
+{
+ astreamer base;
+ z_stream zstream;
+ size_t bytes_written;
+} astreamer_gzip_decompressor;
+
+static void astreamer_gzip_writer_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_gzip_writer_finalize(astreamer *streamer);
+static void astreamer_gzip_writer_free(astreamer *streamer);
+static const char *get_gz_error(gzFile gzf);
+
+static const astreamer_ops astreamer_gzip_writer_ops = {
+ .content = astreamer_gzip_writer_content,
+ .finalize = astreamer_gzip_writer_finalize,
+ .free = astreamer_gzip_writer_free
+};
+
+static void astreamer_gzip_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
+static void astreamer_gzip_decompressor_free(astreamer *streamer);
+static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
+static void gzip_pfree(void *opaque, void *address);
+
+static const astreamer_ops astreamer_gzip_decompressor_ops = {
+ .content = astreamer_gzip_decompressor_content,
+ .finalize = astreamer_gzip_decompressor_finalize,
+ .free = astreamer_gzip_decompressor_free
+};
+#endif
+
+/*
+ * Create a astreamer that just compresses data using gzip, and then writes
+ * it to a file.
+ *
+ * As in the case of astreamer_plain_writer_new, pathname is always used
+ * for error reporting purposes; if file is NULL, it is also the opened and
+ * closed so that the data may be written there.
+ */
+astreamer *
+astreamer_gzip_writer_new(char *pathname, FILE *file,
+ pg_compress_specification *compress)
+{
+#ifdef HAVE_LIBZ
+ astreamer_gzip_writer *streamer;
+
+ streamer = palloc0(sizeof(astreamer_gzip_writer));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_gzip_writer_ops;
+
+ streamer->pathname = pstrdup(pathname);
+
+ if (file == NULL)
+ {
+ streamer->gzfile = gzopen(pathname, "wb");
+ if (streamer->gzfile == NULL)
+ pg_fatal("could not create compressed file \"%s\": %m",
+ pathname);
+ }
+ else
+ {
+ int fd = dup(fileno(file));
+
+ if (fd < 0)
+ pg_fatal("could not duplicate stdout: %m");
+
+ streamer->gzfile = gzdopen(fd, "wb");
+ if (streamer->gzfile == NULL)
+ pg_fatal("could not open output file: %m");
+ }
+
+ if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
+ pg_fatal("could not set compression level %d: %s",
+ compress->level, get_gz_error(streamer->gzfile));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "gzip");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Write archive content to gzip file.
+ */
+static void
+astreamer_gzip_writer_content(astreamer *streamer,
+ astreamer_member *member, const char *data,
+ int len, astreamer_archive_context context)
+{
+ astreamer_gzip_writer *mystreamer;
+
+ mystreamer = (astreamer_gzip_writer *) streamer;
+
+ if (len == 0)
+ return;
+
+ errno = 0;
+ if (gzwrite(mystreamer->gzfile, data, len) != len)
+ {
+ /* if write didn't set errno, assume problem is no disk space */
+ if (errno == 0)
+ errno = ENOSPC;
+ pg_fatal("could not write to compressed file \"%s\": %s",
+ mystreamer->pathname, get_gz_error(mystreamer->gzfile));
+ }
+}
+
+/*
+ * End-of-archive processing when writing to a gzip file consists of just
+ * calling gzclose.
+ *
+ * It makes no difference whether we opened the file or the caller did it,
+ * because libz provides no way of avoiding a close on the underlying file
+ * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
+ * work around this issue, so that the behavior from the caller's viewpoint
+ * is the same as for astreamer_plain_writer.
+ */
+static void
+astreamer_gzip_writer_finalize(astreamer *streamer)
+{
+ astreamer_gzip_writer *mystreamer;
+
+ mystreamer = (astreamer_gzip_writer *) streamer;
+
+ errno = 0; /* in case gzclose() doesn't set it */
+ if (gzclose(mystreamer->gzfile) != 0)
+ pg_fatal("could not close compressed file \"%s\": %m",
+ mystreamer->pathname);
+
+ mystreamer->gzfile = NULL;
+}
+
+/*
+ * Free memory associated with this astreamer.
+ */
+static void
+astreamer_gzip_writer_free(astreamer *streamer)
+{
+ astreamer_gzip_writer *mystreamer;
+
+ mystreamer = (astreamer_gzip_writer *) streamer;
+
+ Assert(mystreamer->base.bbs_next == NULL);
+ Assert(mystreamer->gzfile == NULL);
+
+ pfree(mystreamer->pathname);
+ pfree(mystreamer);
+}
+
+/*
+ * Helper function for libz error reporting.
+ */
+static const char *
+get_gz_error(gzFile gzf)
+{
+ int errnum;
+ const char *errmsg;
+
+ errmsg = gzerror(gzf, &errnum);
+ if (errnum == Z_ERRNO)
+ return strerror(errno);
+ else
+ return errmsg;
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of gzip
+ * compressed blocks.
+ */
+astreamer *
+astreamer_gzip_decompressor_new(astreamer *next)
+{
+#ifdef HAVE_LIBZ
+ astreamer_gzip_decompressor *streamer;
+ z_stream *zs;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_gzip_decompressor));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_gzip_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+
+ /* Initialize internal stream state for decompression */
+ zs = &streamer->zstream;
+ zs->zalloc = gzip_palloc;
+ zs->zfree = gzip_pfree;
+ zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
+ zs->avail_out = streamer->base.bbs_buffer.maxlen;
+
+ /*
+ * Data compression was initialized using deflateInit2 to request a gzip
+ * header. Similarly, we are using inflateInit2 to initialize data
+ * decompression.
+ *
+ * Per the documentation for inflateInit2, the second argument is
+ * "windowBits" and its value must be greater than or equal to the value
+ * provided while compressing the data, so we are using the maximum
+ * possible value for safety.
+ */
+ if (inflateInit2(zs, 15 + 16) != Z_OK)
+ pg_fatal("could not initialize compression library");
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "gzip");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef HAVE_LIBZ
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+astreamer_gzip_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_gzip_decompressor *mystreamer;
+ z_stream *zs;
+
+ mystreamer = (astreamer_gzip_decompressor *) streamer;
+
+ zs = &mystreamer->zstream;
+ zs->next_in = (const uint8 *) data;
+ zs->avail_in = len;
+
+ /* Process the current chunk */
+ while (zs->avail_in > 0)
+ {
+ int res;
+
+ Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
+
+ zs->next_out = (uint8 *)
+ mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ zs->avail_out =
+ mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+ /*
+ * This call decompresses data starting at zs->next_in and updates
+ * zs->next_in * and zs->avail_in. It generates output data starting
+ * at zs->next_out and updates zs->next_out and zs->avail_out
+ * accordingly.
+ */
+ res = inflate(zs, Z_NO_FLUSH);
+
+ if (res == Z_STREAM_ERROR)
+ pg_log_error("could not decompress data: %s", zs->msg);
+
+ mystreamer->bytes_written =
+ mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
+
+ /* If output buffer is full then pass data to next streamer */
+ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen, context);
+ mystreamer->bytes_written = 0;
+ }
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_gzip_decompressor_finalize(astreamer *streamer)
+{
+ astreamer_gzip_decompressor *mystreamer;
+
+ mystreamer = (astreamer_gzip_decompressor *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_gzip_decompressor_free(astreamer *streamer)
+{
+ astreamer_free(streamer->bbs_next);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+
+/*
+ * Wrapper function to adjust the signature of palloc to match what libz
+ * expects.
+ */
+static void *
+gzip_palloc(void *opaque, unsigned items, unsigned size)
+{
+ return palloc(items * size);
+}
+
+/*
+ * Wrapper function to adjust the signature of pfree to match what libz
+ * expects.
+ */
+static void
+gzip_pfree(void *opaque, void *address)
+{
+ pfree(address);
+}
+#endif
diff --git a/src/fe_utils/astreamer_lz4.c b/src/fe_utils/astreamer_lz4.c
new file mode 100644
index 00000000000..d8b2a367e47
--- /dev/null
+++ b/src/fe_utils/astreamer_lz4.c
@@ -0,0 +1,422 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_lz4.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_lz4.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef USE_LZ4
+#include <lz4frame.h>
+#endif
+
+#include "common/file_perm.h"
+#include "common/logging.h"
+#include "common/string.h"
+#include "fe_utils/astreamer.h"
+
+#ifdef USE_LZ4
+typedef struct astreamer_lz4_frame
+{
+ astreamer base;
+
+ LZ4F_compressionContext_t cctx;
+ LZ4F_decompressionContext_t dctx;
+ LZ4F_preferences_t prefs;
+
+ size_t bytes_written;
+ bool header_written;
+} astreamer_lz4_frame;
+
+static void astreamer_lz4_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_lz4_compressor_finalize(astreamer *streamer);
+static void astreamer_lz4_compressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_lz4_compressor_ops = {
+ .content = astreamer_lz4_compressor_content,
+ .finalize = astreamer_lz4_compressor_finalize,
+ .free = astreamer_lz4_compressor_free
+};
+
+static void astreamer_lz4_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
+static void astreamer_lz4_decompressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_lz4_decompressor_ops = {
+ .content = astreamer_lz4_decompressor_content,
+ .finalize = astreamer_lz4_decompressor_finalize,
+ .free = astreamer_lz4_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs lz4 compression of tar
+ * blocks.
+ */
+astreamer *
+astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
+{
+#ifdef USE_LZ4
+ astreamer_lz4_frame *streamer;
+ LZ4F_errorCode_t ctxError;
+ LZ4F_preferences_t *prefs;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_lz4_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_lz4_compressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ streamer->header_written = false;
+
+ /* Initialize stream compression preferences */
+ prefs = &streamer->prefs;
+ memset(prefs, 0, sizeof(LZ4F_preferences_t));
+ prefs->frameInfo.blockSizeID = LZ4F_max256KB;
+ prefs->compressionLevel = compress->level;
+
+ ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctxError))
+ pg_log_error("could not create lz4 compression context: %s",
+ LZ4F_getErrorName(ctxError));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "LZ4");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_LZ4
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+astreamer_lz4_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_lz4_frame *mystreamer;
+ uint8 *next_in,
+ *next_out;
+ size_t out_bound,
+ compressed_size,
+ avail_out;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ next_in = (uint8 *) data;
+
+ /* Write header before processing the first input chunk. */
+ if (!mystreamer->header_written)
+ {
+ compressed_size = LZ4F_compressBegin(mystreamer->cctx,
+ (uint8 *) mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ &mystreamer->prefs);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not write lz4 header: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+ mystreamer->header_written = true;
+ }
+
+ /*
+ * Update the offset and capacity of output buffer based on number of
+ * bytes written to output buffer.
+ */
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+
+ /*
+ * Find out the compression bound and make sure that output buffer has the
+ * required capacity for the success of LZ4F_compressUpdate. If needed
+ * forward the content to next streamer and empty the buffer.
+ */
+ out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
+ if (avail_out < out_bound)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ context);
+
+ /* Enlarge buffer if it falls short of out bound. */
+ if (mystreamer->base.bbs_buffer.maxlen < out_bound)
+ enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+
+ /*
+ * This call compresses the data starting at next_in and generates the
+ * output starting at next_out. It expects the caller to provide the size
+ * of input buffer and capacity of output buffer by providing parameters
+ * len and avail_out.
+ *
+ * It returns the number of bytes compressed to output buffer.
+ */
+ compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
+ next_out, avail_out,
+ next_in, len, NULL);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not compress data: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_lz4_compressor_finalize(astreamer *streamer)
+{
+ astreamer_lz4_frame *mystreamer;
+ uint8 *next_out;
+ size_t footer_bound,
+ compressed_size,
+ avail_out;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+
+ /* Find out the footer bound and update the output buffer. */
+ footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
+ if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
+ footer_bound)
+ {
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ ASTREAMER_UNKNOWN);
+
+ /* Enlarge buffer if it falls short of footer bound. */
+ if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
+ enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+ else
+ {
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+ }
+
+ /*
+ * Finalize the frame and flush whatever data remaining in compression
+ * context.
+ */
+ compressed_size = LZ4F_compressEnd(mystreamer->cctx,
+ next_out, avail_out, NULL);
+
+ if (LZ4F_isError(compressed_size))
+ pg_log_error("could not end lz4 compression: %s",
+ LZ4F_getErrorName(compressed_size));
+
+ mystreamer->bytes_written += compressed_size;
+
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_lz4_compressor_free(astreamer *streamer)
+{
+ astreamer_lz4_frame *mystreamer;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ astreamer_free(streamer->bbs_next);
+ LZ4F_freeCompressionContext(mystreamer->cctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of lz4
+ * compressed blocks.
+ */
+astreamer *
+astreamer_lz4_decompressor_new(astreamer *next)
+{
+#ifdef USE_LZ4
+ astreamer_lz4_frame *streamer;
+ LZ4F_errorCode_t ctxError;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_lz4_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_lz4_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+
+ /* Initialize internal stream state for decompression */
+ ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctxError))
+ pg_fatal("could not initialize compression library: %s",
+ LZ4F_getErrorName(ctxError));
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "LZ4");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_LZ4
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+astreamer_lz4_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_lz4_frame *mystreamer;
+ uint8 *next_in,
+ *next_out;
+ size_t avail_in,
+ avail_out;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ next_in = (uint8 *) data;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ avail_in = len;
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+
+ while (avail_in > 0)
+ {
+ size_t ret,
+ read_size,
+ out_size;
+
+ read_size = avail_in;
+ out_size = avail_out;
+
+ /*
+ * This call decompresses the data starting at next_in and generates
+ * the output data starting at next_out. It expects the caller to
+ * provide size of the input buffer and total capacity of the output
+ * buffer by providing the read_size and out_size parameters
+ * respectively.
+ *
+ * Per the documentation of LZ4, parameters read_size and out_size
+ * behaves as dual parameters. On return, the number of bytes consumed
+ * from the input buffer will be written back to read_size and the
+ * number of bytes decompressed to output buffer will be written back
+ * to out_size respectively.
+ */
+ ret = LZ4F_decompress(mystreamer->dctx,
+ next_out, &out_size,
+ next_in, &read_size, NULL);
+
+ if (LZ4F_isError(ret))
+ pg_log_error("could not decompress data: %s",
+ LZ4F_getErrorName(ret));
+
+ /* Update input buffer based on number of bytes consumed */
+ avail_in -= read_size;
+ next_in += read_size;
+
+ mystreamer->bytes_written += out_size;
+
+ /*
+ * If output buffer is full then forward the content to next streamer
+ * and update the output buffer.
+ */
+ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ context);
+
+ avail_out = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->bytes_written = 0;
+ next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
+ }
+ else
+ {
+ avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
+ next_out += mystreamer->bytes_written;
+ }
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_lz4_decompressor_finalize(astreamer *streamer)
+{
+ astreamer_lz4_frame *mystreamer;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_lz4_decompressor_free(astreamer *streamer)
+{
+ astreamer_lz4_frame *mystreamer;
+
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ astreamer_free(streamer->bbs_next);
+ LZ4F_freeDecompressionContext(mystreamer->dctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c
new file mode 100644
index 00000000000..f5d3562d280
--- /dev/null
+++ b/src/fe_utils/astreamer_tar.c
@@ -0,0 +1,514 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_tar.c
+ *
+ * This module implements three types of tar processing. A tar parser
+ * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits
+ * it into labelled chunks (any other value of astreamer_archive_context).
+ * A tar archiver does the reverse: it takes a bunch of labelled chunks
+ * and produces a tarfile, optionally replacing member headers and trailers
+ * so that upstream astreamer objects can perform surgery on the tarfile
+ * contents without knowing the details of the tar format. A tar terminator
+ * just adds two blocks of NUL bytes to the end of the file, since older
+ * server versions produce files with this terminator omitted.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_tar.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <time.h>
+
+#include "common/logging.h"
+#include "fe_utils/astreamer.h"
+#include "pgtar.h"
+
+typedef struct astreamer_tar_parser
+{
+ astreamer base;
+ astreamer_archive_context next_context;
+ astreamer_member member;
+ size_t file_bytes_sent;
+ size_t pad_bytes_expected;
+} astreamer_tar_parser;
+
+typedef struct astreamer_tar_archiver
+{
+ astreamer base;
+ bool rearchive_member;
+} astreamer_tar_archiver;
+
+static void astreamer_tar_parser_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_parser_finalize(astreamer *streamer);
+static void astreamer_tar_parser_free(astreamer *streamer);
+static bool astreamer_tar_header(astreamer_tar_parser *mystreamer);
+
+static const astreamer_ops astreamer_tar_parser_ops = {
+ .content = astreamer_tar_parser_content,
+ .finalize = astreamer_tar_parser_finalize,
+ .free = astreamer_tar_parser_free
+};
+
+static void astreamer_tar_archiver_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_archiver_finalize(astreamer *streamer);
+static void astreamer_tar_archiver_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_tar_archiver_ops = {
+ .content = astreamer_tar_archiver_content,
+ .finalize = astreamer_tar_archiver_finalize,
+ .free = astreamer_tar_archiver_free
+};
+
+static void astreamer_tar_terminator_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_terminator_finalize(astreamer *streamer);
+static void astreamer_tar_terminator_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_tar_terminator_ops = {
+ .content = astreamer_tar_terminator_content,
+ .finalize = astreamer_tar_terminator_finalize,
+ .free = astreamer_tar_terminator_free
+};
+
+/*
+ * Create a astreamer that can parse a stream of content as tar data.
+ *
+ * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer
+ * specified by 'next' will receive a series of typed chunks, as per the
+ * conventions described in astreamer.h.
+ */
+astreamer *
+astreamer_tar_parser_new(astreamer *next)
+{
+ astreamer_tar_parser *streamer;
+
+ streamer = palloc0(sizeof(astreamer_tar_parser));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_tar_parser_ops;
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ streamer->next_context = ASTREAMER_MEMBER_HEADER;
+
+ return &streamer->base;
+}
+
+/*
+ * Parse unknown content as tar data.
+ */
+static void
+astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
+ size_t nbytes;
+
+ /* Expect unparsed input. */
+ Assert(member == NULL);
+ Assert(context == ASTREAMER_UNKNOWN);
+
+ while (len > 0)
+ {
+ switch (mystreamer->next_context)
+ {
+ case ASTREAMER_MEMBER_HEADER:
+
+ /*
+ * If we're expecting an archive member header, accumulate a
+ * full block of data before doing anything further.
+ */
+ if (!astreamer_buffer_until(streamer, &data, &len,
+ TAR_BLOCK_SIZE))
+ return;
+
+ /*
+ * Now we can process the header and get ready to process the
+ * file contents; however, we might find out that what we
+ * thought was the next file header is actually the start of
+ * the archive trailer. Switch modes accordingly.
+ */
+ if (astreamer_tar_header(mystreamer))
+ {
+ if (mystreamer->member.size == 0)
+ {
+ /* No content; trailer is zero-length. */
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ ASTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Expect contents. */
+ mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ mystreamer->file_bytes_sent = 0;
+ }
+ else
+ mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER;
+ break;
+
+ case ASTREAMER_MEMBER_CONTENTS:
+
+ /*
+ * Send as much content as we have, but not more than the
+ * remaining file length.
+ */
+ Assert(mystreamer->file_bytes_sent < mystreamer->member.size);
+ nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
+ nbytes = Min(nbytes, len);
+ Assert(nbytes > 0);
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, nbytes,
+ ASTREAMER_MEMBER_CONTENTS);
+ mystreamer->file_bytes_sent += nbytes;
+ data += nbytes;
+ len -= nbytes;
+
+ /*
+ * If we've not yet sent the whole file, then there's more
+ * content to come; otherwise, it's time to expect the file
+ * trailer.
+ */
+ Assert(mystreamer->file_bytes_sent <= mystreamer->member.size);
+ if (mystreamer->file_bytes_sent == mystreamer->member.size)
+ {
+ if (mystreamer->pad_bytes_expected == 0)
+ {
+ /* Trailer is zero-length. */
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ ASTREAMER_MEMBER_TRAILER);
+
+ /* Expect next header. */
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
+ }
+ else
+ {
+ /* Trailer is not zero-length. */
+ mystreamer->next_context = ASTREAMER_MEMBER_TRAILER;
+ }
+ mystreamer->base.bbs_buffer.len = 0;
+ }
+ break;
+
+ case ASTREAMER_MEMBER_TRAILER:
+
+ /*
+ * If we're expecting an archive member trailer, accumulate
+ * the expected number of padding bytes before sending
+ * anything onward.
+ */
+ if (!astreamer_buffer_until(streamer, &data, &len,
+ mystreamer->pad_bytes_expected))
+ return;
+
+ /* OK, now we can send it. */
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, mystreamer->pad_bytes_expected,
+ ASTREAMER_MEMBER_TRAILER);
+
+ /* Expect next file header. */
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
+ mystreamer->base.bbs_buffer.len = 0;
+ break;
+
+ case ASTREAMER_ARCHIVE_TRAILER:
+
+ /*
+ * We've seen an end-of-archive indicator, so anything more is
+ * buffered and sent as part of the archive trailer. But we
+ * don't expect more than 2 blocks.
+ */
+ astreamer_buffer_bytes(streamer, &data, &len, len);
+ if (len > 2 * TAR_BLOCK_SIZE)
+ pg_fatal("tar file trailer exceeds 2 blocks");
+ return;
+
+ default:
+ /* Shouldn't happen. */
+ pg_fatal("unexpected state while parsing tar archive");
+ }
+ }
+}
+
+/*
+ * Parse a file header within a tar stream.
+ *
+ * The return value is true if we found a file header and passed it on to the
+ * next astreamer; it is false if we have reached the archive trailer.
+ */
+static bool
+astreamer_tar_header(astreamer_tar_parser *mystreamer)
+{
+ bool has_nonzero_byte = false;
+ int i;
+ astreamer_member *member = &mystreamer->member;
+ char *buffer = mystreamer->base.bbs_buffer.data;
+
+ Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
+
+ /* Check whether we've got a block of all zero bytes. */
+ for (i = 0; i < TAR_BLOCK_SIZE; ++i)
+ {
+ if (buffer[i] != '\0')
+ {
+ has_nonzero_byte = true;
+ break;
+ }
+ }
+
+ /*
+ * If the entire block was zeros, this is the end of the archive, not the
+ * start of the next file.
+ */
+ if (!has_nonzero_byte)
+ return false;
+
+ /*
+ * Parse key fields out of the header.
+ */
+ strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH);
+ if (member->pathname[0] == '\0')
+ pg_fatal("tar member has empty name");
+ member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12);
+ member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8);
+ member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8);
+ member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8);
+ member->is_directory =
+ (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY);
+ member->is_link =
+ (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK);
+ if (member->is_link)
+ strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100);
+
+ /* Compute number of padding bytes. */
+ mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
+
+ /* Forward the entire header to the next astreamer. */
+ astreamer_content(mystreamer->base.bbs_next, member,
+ buffer, TAR_BLOCK_SIZE,
+ ASTREAMER_MEMBER_HEADER);
+
+ return true;
+}
+
+/*
+ * End-of-stream processing for a tar parser.
+ */
+static void
+astreamer_tar_parser_finalize(astreamer *streamer)
+{
+ astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
+
+ if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER &&
+ (mystreamer->next_context != ASTREAMER_MEMBER_HEADER ||
+ mystreamer->base.bbs_buffer.len > 0))
+ pg_fatal("COPY stream ended before last file was finished");
+
+ /* Send the archive trailer, even if empty. */
+ astreamer_content(streamer->bbs_next, NULL,
+ streamer->bbs_buffer.data, streamer->bbs_buffer.len,
+ ASTREAMER_ARCHIVE_TRAILER);
+
+ /* Now finalize successor. */
+ astreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar parser.
+ */
+static void
+astreamer_tar_parser_free(astreamer *streamer)
+{
+ pfree(streamer->bbs_buffer.data);
+ astreamer_free(streamer->bbs_next);
+}
+
+/*
+ * Create a astreamer that can generate a tar archive.
+ *
+ * This is intended to be usable either for generating a brand-new tar archive
+ * or for modifying one on the fly. The input should be a series of typed
+ * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for
+ * astreamer_tar_parser_content.
+ */
+astreamer *
+astreamer_tar_archiver_new(astreamer *next)
+{
+ astreamer_tar_archiver *streamer;
+
+ streamer = palloc0(sizeof(astreamer_tar_archiver));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_tar_archiver_ops;
+ streamer->base.bbs_next = next;
+
+ return &streamer->base;
+}
+
+/*
+ * Fix up the stream of input chunks to create a valid tar file.
+ *
+ * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
+ * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
+ * passed through without change. Any other size is a fatal error (and
+ * indicates a bug).
+ *
+ * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the
+ * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from
+ * scratch. Specifically, we construct a block of zero bytes sufficient to
+ * pad out to a block boundary, as required by the tar format. Other
+ * ASTREAMER_MEMBER_TRAILER chunks are passed through without change.
+ *
+ * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change.
+ *
+ * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two
+ * blocks of zero bytes. Not all tar programs require this, but apparently
+ * some do. The server does not supply this trailer. If no archive trailer is
+ * present, one will be added by astreamer_tar_parser_finalize.
+ */
+static void
+astreamer_tar_archiver_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer;
+ char buffer[2 * TAR_BLOCK_SIZE];
+
+ Assert(context != ASTREAMER_UNKNOWN);
+
+ if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
+ {
+ Assert(len == 0);
+
+ /* Replace zero-length tar header with a newly constructed one. */
+ tarCreateHeader(buffer, member->pathname, NULL,
+ member->size, member->mode, member->uid, member->gid,
+ time(NULL));
+ data = buffer;
+ len = TAR_BLOCK_SIZE;
+
+ /* Also make a note to replace padding, in case size changed. */
+ mystreamer->rearchive_member = true;
+ }
+ else if (context == ASTREAMER_MEMBER_TRAILER &&
+ mystreamer->rearchive_member)
+ {
+ int pad_bytes = tarPaddingBytesRequired(member->size);
+
+ /* Also replace padding, if we regenerated the header. */
+ memset(buffer, 0, pad_bytes);
+ data = buffer;
+ len = pad_bytes;
+
+ /* Don't do this again unless we replace another header. */
+ mystreamer->rearchive_member = false;
+ }
+ else if (context == ASTREAMER_ARCHIVE_TRAILER)
+ {
+ /* Trailer should always be two blocks of zero bytes. */
+ memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
+ data = buffer;
+ len = 2 * TAR_BLOCK_SIZE;
+ }
+
+ astreamer_content(streamer->bbs_next, member, data, len, context);
+}
+
+/*
+ * End-of-stream processing for a tar archiver.
+ */
+static void
+astreamer_tar_archiver_finalize(astreamer *streamer)
+{
+ astreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar archiver.
+ */
+static void
+astreamer_tar_archiver_free(astreamer *streamer)
+{
+ astreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
+
+/*
+ * Create a astreamer that blindly adds two blocks of NUL bytes to the
+ * end of an incomplete tarfile that the server might send us.
+ */
+astreamer *
+astreamer_tar_terminator_new(astreamer *next)
+{
+ astreamer *streamer;
+
+ streamer = palloc0(sizeof(astreamer));
+ *((const astreamer_ops **) &streamer->bbs_ops) =
+ &astreamer_tar_terminator_ops;
+ streamer->bbs_next = next;
+
+ return streamer;
+}
+
+/*
+ * Pass all the content through without change.
+ */
+static void
+astreamer_tar_terminator_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ /* Expect unparsed input. */
+ Assert(member == NULL);
+ Assert(context == ASTREAMER_UNKNOWN);
+
+ /* Just forward it. */
+ astreamer_content(streamer->bbs_next, member, data, len, context);
+}
+
+/*
+ * At the end, blindly add the two blocks of NUL bytes which the server fails
+ * to supply.
+ */
+static void
+astreamer_tar_terminator_finalize(astreamer *streamer)
+{
+ char buffer[2 * TAR_BLOCK_SIZE];
+
+ memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
+ astreamer_content(streamer->bbs_next, NULL, buffer,
+ 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN);
+ astreamer_finalize(streamer->bbs_next);
+}
+
+/*
+ * Free memory associated with a tar terminator.
+ */
+static void
+astreamer_tar_terminator_free(astreamer *streamer)
+{
+ astreamer_free(streamer->bbs_next);
+ pfree(streamer);
+}
diff --git a/src/fe_utils/astreamer_zstd.c b/src/fe_utils/astreamer_zstd.c
new file mode 100644
index 00000000000..45f6cb67363
--- /dev/null
+++ b/src/fe_utils/astreamer_zstd.c
@@ -0,0 +1,368 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_zstd.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_zstd.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
+#include "common/logging.h"
+#include "fe_utils/astreamer.h"
+
+#ifdef USE_ZSTD
+
+typedef struct astreamer_zstd_frame
+{
+ astreamer base;
+
+ ZSTD_CCtx *cctx;
+ ZSTD_DCtx *dctx;
+ ZSTD_outBuffer zstd_outBuf;
+} astreamer_zstd_frame;
+
+static void astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_compressor_finalize(astreamer *streamer);
+static void astreamer_zstd_compressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_compressor_ops = {
+ .content = astreamer_zstd_compressor_content,
+ .finalize = astreamer_zstd_compressor_finalize,
+ .free = astreamer_zstd_compressor_free
+};
+
+static void astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
+static void astreamer_zstd_decompressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_decompressor_ops = {
+ .content = astreamer_zstd_decompressor_content,
+ .finalize = astreamer_zstd_decompressor_finalize,
+ .free = astreamer_zstd_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs zstd compression of tar
+ * blocks.
+ */
+astreamer *
+astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
+{
+#ifdef USE_ZSTD
+ astreamer_zstd_frame *streamer;
+ size_t ret;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
+
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_compressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+ streamer->cctx = ZSTD_createCCtx();
+ if (!streamer->cctx)
+ pg_fatal("could not create zstd compression context");
+
+ /* Set compression level */
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ pg_fatal("could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+
+ /* Set # of workers, if specified */
+ if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and
+ * trying to set it will fail. Similarly for newer versions if they
+ * are compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ pg_fatal("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret));
+ }
+
+ if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(streamer->cctx,
+ ZSTD_c_enableLongDistanceMatching,
+ compress->long_distance);
+ if (ZSTD_isError(ret))
+ {
+ pg_log_error("could not enable long-distance mode: %s",
+ ZSTD_getErrorName(ret));
+ exit(1);
+ }
+ }
+
+ /* Initialize the ZSTD output buffer. */
+ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+ streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+ streamer->zstd_outBuf.pos = 0;
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_ZSTD
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ ZSTD_inBuffer inBuf = {data, len, 0};
+
+ while (inBuf.pos < inBuf.size)
+ {
+ size_t yet_to_flush;
+ size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
+
+ /*
+ * If the output buffer is not left with enough space, send the
+ * compressed bytes to the next streamer, and empty the buffer.
+ */
+ if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
+ max_needed)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush =
+ ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
+ &inBuf, ZSTD_e_continue);
+
+ if (ZSTD_isError(yet_to_flush))
+ pg_log_error("could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_zstd_compressor_finalize(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ size_t yet_to_flush;
+
+ do
+ {
+ ZSTD_inBuffer in = {NULL, 0, 0};
+ size_t max_needed = ZSTD_compressBound(0);
+
+ /*
+ * If the output buffer is not left with enough space, send the
+ * compressed bytes to the next streamer, and empty the buffer.
+ */
+ if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
+ max_needed)
+ {
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
+ &mystreamer->zstd_outBuf,
+ &in, ZSTD_e_end);
+
+ if (ZSTD_isError(yet_to_flush))
+ pg_log_error("could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+
+ } while (yet_to_flush > 0);
+
+ /* Make sure to pass any remaining bytes to the next streamer. */
+ if (mystreamer->zstd_outBuf.pos > 0)
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_zstd_compressor_free(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ astreamer_free(streamer->bbs_next);
+ ZSTD_freeCCtx(mystreamer->cctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of zstd
+ * compressed blocks.
+ */
+astreamer *
+astreamer_zstd_decompressor_new(astreamer *next)
+{
+#ifdef USE_ZSTD
+ astreamer_zstd_frame *streamer;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+ streamer->dctx = ZSTD_createDCtx();
+ if (!streamer->dctx)
+ pg_fatal("could not create zstd decompression context");
+
+ /* Initialize the ZSTD output buffer. */
+ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+ streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+ streamer->zstd_outBuf.pos = 0;
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_ZSTD
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ ZSTD_inBuffer inBuf = {data, len, 0};
+
+ while (inBuf.pos < inBuf.size)
+ {
+ size_t ret;
+
+ /*
+ * If output buffer is full then forward the content to next streamer
+ * and update the output buffer.
+ */
+ if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ ret = ZSTD_decompressStream(mystreamer->dctx,
+ &mystreamer->zstd_outBuf, &inBuf);
+
+ if (ZSTD_isError(ret))
+ pg_log_error("could not decompress data: %s",
+ ZSTD_getErrorName(ret));
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_zstd_decompressor_finalize(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ if (mystreamer->zstd_outBuf.pos > 0)
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_zstd_decompressor_free(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ astreamer_free(streamer->bbs_next);
+ ZSTD_freeDCtx(mystreamer->dctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
diff --git a/src/fe_utils/meson.build b/src/fe_utils/meson.build
index 14d0482a2cc..043021d826d 100644
--- a/src/fe_utils/meson.build
+++ b/src/fe_utils/meson.build
@@ -2,6 +2,11 @@
fe_utils_sources = files(
'archive.c',
+ 'astreamer_file.c',
+ 'astreamer_gzip.c',
+ 'astreamer_lz4.c',
+ 'astreamer_tar.c',
+ 'astreamer_zstd.c',
'cancel.c',
'conditional.c',
'connect_utils.c',