diff options
author | Robert Haas | 2024-08-05 15:40:29 +0000 |
---|---|---|
committer | Robert Haas | 2024-08-05 15:41:57 +0000 |
commit | f80b09bac87d6b49f5dbb6131da5fbd9b9773c5c (patch) | |
tree | 353896ae8f4a860c269ec705c4f1becddf4f90f1 /src/fe_utils | |
parent | 53b2c921a0f9b56465ab65165c1909f9616ffa98 (diff) |
Move astreamer (except astreamer_inject) to fe_utils.
This allows the code to be used by other frontend applications.
Amul Sul, reviewed by Sravan Kumar, Andres Freund (whose input
I specifically solicited regarding the meson.build changes),
and me.
Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com
Diffstat (limited to 'src/fe_utils')
-rw-r--r-- | src/fe_utils/Makefile | 5 | ||||
-rw-r--r-- | src/fe_utils/astreamer_file.c | 396 | ||||
-rw-r--r-- | src/fe_utils/astreamer_gzip.c | 364 | ||||
-rw-r--r-- | src/fe_utils/astreamer_lz4.c | 422 | ||||
-rw-r--r-- | src/fe_utils/astreamer_tar.c | 514 | ||||
-rw-r--r-- | src/fe_utils/astreamer_zstd.c | 368 | ||||
-rw-r--r-- | src/fe_utils/meson.build | 5 |
7 files changed, 2074 insertions, 0 deletions
diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile index 946c05258f0..2694be4b859 100644 --- a/src/fe_utils/Makefile +++ b/src/fe_utils/Makefile @@ -21,6 +21,11 @@ override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS) OBJS = \ archive.o \ + astreamer_file.o \ + astreamer_gzip.o \ + astreamer_lz4.o \ + astreamer_tar.o \ + astreamer_zstd.o \ cancel.o \ conditional.o \ connect_utils.o \ diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c new file mode 100644 index 00000000000..13d1192c6e6 --- /dev/null +++ b/src/fe_utils/astreamer_file.c @@ -0,0 +1,396 @@ +/*------------------------------------------------------------------------- + * + * astreamer_file.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_file.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <unistd.h> + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +typedef struct astreamer_plain_writer +{ + astreamer base; + char *pathname; + FILE *file; + bool should_close_file; +} astreamer_plain_writer; + +typedef struct astreamer_extractor +{ + astreamer base; + char *basepath; + const char *(*link_map) (const char *); + void (*report_output_file) (const char *); + char filename[MAXPGPATH]; + FILE *file; +} astreamer_extractor; + +static void astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_plain_writer_finalize(astreamer *streamer); +static void astreamer_plain_writer_free(astreamer *streamer); + +static const astreamer_ops astreamer_plain_writer_ops = { + .content = astreamer_plain_writer_content, + .finalize = astreamer_plain_writer_finalize, + .free = astreamer_plain_writer_free +}; + +static void astreamer_extractor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_extractor_finalize(astreamer *streamer); +static void astreamer_extractor_free(astreamer *streamer); +static void extract_directory(const char *filename, mode_t mode); +static void extract_link(const char *filename, const char *linktarget); +static FILE *create_file_for_extract(const char *filename, mode_t mode); + +static const astreamer_ops astreamer_extractor_ops = { + .content = astreamer_extractor_content, + .finalize = astreamer_extractor_finalize, + .free = astreamer_extractor_free +}; + +/* + * Create a astreamer that just writes data to a file. + * + * The caller must specify a pathname and may specify a file. The pathname is + * used for error-reporting purposes either way. If file is NULL, the pathname + * also identifies the file to which the data should be written: it is opened + * for writing and closed when done. If file is not NULL, the data is written + * there. + */ +astreamer * +astreamer_plain_writer_new(char *pathname, FILE *file) +{ + astreamer_plain_writer *streamer; + + streamer = palloc0(sizeof(astreamer_plain_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_plain_writer_ops; + + streamer->pathname = pstrdup(pathname); + streamer->file = file; + + if (file == NULL) + { + streamer->file = fopen(pathname, "wb"); + if (streamer->file == NULL) + pg_fatal("could not create file \"%s\": %m", pathname); + streamer->should_close_file = true; + } + + return &streamer->base; +} + +/* + * Write archive content to file. + */ +static void +astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->pathname); + } +} + +/* + * End-of-archive processing when writing to a plain file consists of closing + * the file if we opened it, but not if the caller provided it. + */ +static void +astreamer_plain_writer_finalize(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) + pg_fatal("could not close file \"%s\": %m", + mystreamer->pathname); + + mystreamer->file = NULL; + mystreamer->should_close_file = false; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_plain_writer_free(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + Assert(!mystreamer->should_close_file); + Assert(mystreamer->base.bbs_next == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Create a astreamer that extracts an archive. + * + * All pathnames in the archive are interpreted relative to basepath. + * + * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here + * with untyped chunks; we need typed chunks which follow the rules described + * in astreamer.h. Assuming we have that, we don't need to worry about the + * original archive format; it's enough to just look at the member information + * provided and write to the corresponding file. + * + * 'link_map' is a function that will be applied to the target of any + * symbolic link, and which should return a replacement pathname to be used + * in its place. If NULL, the symbolic link target is used without + * modification. + * + * 'report_output_file' is a function that will be called each time we open a + * new output file. The pathname to that file is passed as an argument. If + * NULL, the call is skipped. + */ +astreamer * +astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)) +{ + astreamer_extractor *streamer; + + streamer = palloc0(sizeof(astreamer_extractor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_extractor_ops; + streamer->basepath = pstrdup(basepath); + streamer->link_map = link_map; + streamer->report_output_file = report_output_file; + + return &streamer->base; +} + +/* + * Extract archive contents to the filesystem. + */ +static void +astreamer_extractor_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + int fnamelen; + + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); + Assert(context != ASTREAMER_UNKNOWN); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + Assert(mystreamer->file == NULL); + + /* Prepend basepath. */ + snprintf(mystreamer->filename, sizeof(mystreamer->filename), + "%s/%s", mystreamer->basepath, member->pathname); + + /* Remove any trailing slash. */ + fnamelen = strlen(mystreamer->filename); + if (mystreamer->filename[fnamelen - 1] == '/') + mystreamer->filename[fnamelen - 1] = '\0'; + + /* Dispatch based on file type. */ + if (member->is_directory) + extract_directory(mystreamer->filename, member->mode); + else if (member->is_link) + { + const char *linktarget = member->linktarget; + + if (mystreamer->link_map) + linktarget = mystreamer->link_map(linktarget); + extract_link(mystreamer->filename, linktarget); + } + else + mystreamer->file = + create_file_for_extract(mystreamer->filename, + member->mode); + + /* Report output file change. */ + if (mystreamer->report_output_file) + mystreamer->report_output_file(mystreamer->filename); + break; + + case ASTREAMER_MEMBER_CONTENTS: + if (mystreamer->file == NULL) + break; + + errno = 0; + if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->filename); + } + break; + + case ASTREAMER_MEMBER_TRAILER: + if (mystreamer->file == NULL) + break; + fclose(mystreamer->file); + mystreamer->file = NULL; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while extracting archive"); + } +} + +/* + * Should we tolerate an already-existing directory? + * + * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been + * created by the wal receiver process. Also, when the WAL directory location + * was specified, pg_wal (or pg_xlog) has already been created as a symbolic + * link before starting the actual backup. So just ignore creation failures + * on related directories. + * + * If in-place tablespaces are used, pg_tblspc and subdirectories may already + * exist when we get here. So tolerate that case, too. + */ +static bool +should_allow_existing_directory(const char *pathname) +{ + const char *filename = last_dir_separator(pathname) + 1; + + if (strcmp(filename, "pg_wal") == 0 || + strcmp(filename, "pg_xlog") == 0 || + strcmp(filename, "archive_status") == 0 || + strcmp(filename, "summaries") == 0 || + strcmp(filename, "pg_tblspc") == 0) + return true; + + if (strspn(filename, "0123456789") == strlen(filename)) + { + const char *pg_tblspc = strstr(pathname, "/pg_tblspc/"); + + return pg_tblspc != NULL && pg_tblspc + 11 == filename; + } + + return false; +} + +/* + * Create a directory. + */ +static void +extract_directory(const char *filename, mode_t mode) +{ + if (mkdir(filename, pg_dir_create_mode) != 0 && + (errno != EEXIST || !should_allow_existing_directory(filename))) + pg_fatal("could not create directory \"%s\": %m", + filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on directory \"%s\": %m", + filename); +#endif +} + +/* + * Create a symbolic link. + * + * It's most likely a link in pg_tblspc directory, to the location of a + * tablespace. Apply any tablespace mapping given on the command line + * (--tablespace-mapping). (We blindly apply the mapping without checking that + * the link really is inside pg_tblspc. We don't expect there to be other + * symlinks in a data directory, but if there are, you can call it an + * undocumented feature that you can map them too.) + */ +static void +extract_link(const char *filename, const char *linktarget) +{ + if (symlink(linktarget, filename) != 0) + pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m", + filename, linktarget); +} + +/* + * Create a regular file. + * + * Return the resulting handle so we can write the content to the file. + */ +static FILE * +create_file_for_extract(const char *filename, mode_t mode) +{ + FILE *file; + + file = fopen(filename, "wb"); + if (file == NULL) + pg_fatal("could not create file \"%s\": %m", filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on file \"%s\": %m", + filename); +#endif + + return file; +} + +/* + * End-of-stream processing for extracting an archive. + * + * There's nothing to do here but sanity checking. + */ +static void +astreamer_extractor_finalize(astreamer *streamer) +{ + astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY + = (astreamer_extractor *) streamer; + + Assert(mystreamer->file == NULL); +} + +/* + * Free memory. + */ +static void +astreamer_extractor_free(astreamer *streamer) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + + pfree(mystreamer->basepath); + pfree(mystreamer); +} diff --git a/src/fe_utils/astreamer_gzip.c b/src/fe_utils/astreamer_gzip.c new file mode 100644 index 00000000000..dd28defac7b --- /dev/null +++ b/src/fe_utils/astreamer_gzip.c @@ -0,0 +1,364 @@ +/*------------------------------------------------------------------------- + * + * astreamer_gzip.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_gzip.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <unistd.h> + +#ifdef HAVE_LIBZ +#include <zlib.h> +#endif + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +#ifdef HAVE_LIBZ +typedef struct astreamer_gzip_writer +{ + astreamer base; + char *pathname; + gzFile gzfile; +} astreamer_gzip_writer; + +typedef struct astreamer_gzip_decompressor +{ + astreamer base; + z_stream zstream; + size_t bytes_written; +} astreamer_gzip_decompressor; + +static void astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_writer_finalize(astreamer *streamer); +static void astreamer_gzip_writer_free(astreamer *streamer); +static const char *get_gz_error(gzFile gzf); + +static const astreamer_ops astreamer_gzip_writer_ops = { + .content = astreamer_gzip_writer_content, + .finalize = astreamer_gzip_writer_finalize, + .free = astreamer_gzip_writer_free +}; + +static void astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_decompressor_finalize(astreamer *streamer); +static void astreamer_gzip_decompressor_free(astreamer *streamer); +static void *gzip_palloc(void *opaque, unsigned items, unsigned size); +static void gzip_pfree(void *opaque, void *address); + +static const astreamer_ops astreamer_gzip_decompressor_ops = { + .content = astreamer_gzip_decompressor_content, + .finalize = astreamer_gzip_decompressor_finalize, + .free = astreamer_gzip_decompressor_free +}; +#endif + +/* + * Create a astreamer that just compresses data using gzip, and then writes + * it to a file. + * + * As in the case of astreamer_plain_writer_new, pathname is always used + * for error reporting purposes; if file is NULL, it is also the opened and + * closed so that the data may be written there. + */ +astreamer * +astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_writer *streamer; + + streamer = palloc0(sizeof(astreamer_gzip_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_writer_ops; + + streamer->pathname = pstrdup(pathname); + + if (file == NULL) + { + streamer->gzfile = gzopen(pathname, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not create compressed file \"%s\": %m", + pathname); + } + else + { + int fd = dup(fileno(file)); + + if (fd < 0) + pg_fatal("could not duplicate stdout: %m"); + + streamer->gzfile = gzdopen(fd, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not open output file: %m"); + } + + if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) + pg_fatal("could not set compression level %d: %s", + compress->level, get_gz_error(streamer->gzfile)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Write archive content to gzip file. + */ +static void +astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (gzwrite(mystreamer->gzfile, data, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to compressed file \"%s\": %s", + mystreamer->pathname, get_gz_error(mystreamer->gzfile)); + } +} + +/* + * End-of-archive processing when writing to a gzip file consists of just + * calling gzclose. + * + * It makes no difference whether we opened the file or the caller did it, + * because libz provides no way of avoiding a close on the underlying file + * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to + * work around this issue, so that the behavior from the caller's viewpoint + * is the same as for astreamer_plain_writer. + */ +static void +astreamer_gzip_writer_finalize(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + errno = 0; /* in case gzclose() doesn't set it */ + if (gzclose(mystreamer->gzfile) != 0) + pg_fatal("could not close compressed file \"%s\": %m", + mystreamer->pathname); + + mystreamer->gzfile = NULL; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_gzip_writer_free(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + Assert(mystreamer->base.bbs_next == NULL); + Assert(mystreamer->gzfile == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Helper function for libz error reporting. + */ +static const char * +get_gz_error(gzFile gzf) +{ + int errnum; + const char *errmsg; + + errmsg = gzerror(gzf, &errnum); + if (errnum == Z_ERRNO) + return strerror(errno); + else + return errmsg; +} +#endif + +/* + * Create a new base backup streamer that performs decompression of gzip + * compressed blocks. + */ +astreamer * +astreamer_gzip_decompressor_new(astreamer *next) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_decompressor *streamer; + z_stream *zs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_gzip_decompressor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + zs = &streamer->zstream; + zs->zalloc = gzip_palloc; + zs->zfree = gzip_pfree; + zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; + zs->avail_out = streamer->base.bbs_buffer.maxlen; + + /* + * Data compression was initialized using deflateInit2 to request a gzip + * header. Similarly, we are using inflateInit2 to initialize data + * decompression. + * + * Per the documentation for inflateInit2, the second argument is + * "windowBits" and its value must be greater than or equal to the value + * provided while compressing the data, so we are using the maximum + * possible value for safety. + */ + if (inflateInit2(zs, 15 + 16) != Z_OK) + pg_fatal("could not initialize compression library"); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_gzip_decompressor *mystreamer; + z_stream *zs; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + zs = &mystreamer->zstream; + zs->next_in = (const uint8 *) data; + zs->avail_in = len; + + /* Process the current chunk */ + while (zs->avail_in > 0) + { + int res; + + Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); + + zs->next_out = (uint8 *) + mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + zs->avail_out = + mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * This call decompresses data starting at zs->next_in and updates + * zs->next_in * and zs->avail_in. It generates output data starting + * at zs->next_out and updates zs->next_out and zs->avail_out + * accordingly. + */ + res = inflate(zs, Z_NO_FLUSH); + + if (res == Z_STREAM_ERROR) + pg_log_error("could not decompress data: %s", zs->msg); + + mystreamer->bytes_written = + mystreamer->base.bbs_buffer.maxlen - zs->avail_out; + + /* If output buffer is full then pass data to next streamer */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, context); + mystreamer->bytes_written = 0; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_gzip_decompressor_finalize(astreamer *streamer) +{ + astreamer_gzip_decompressor *mystreamer; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_gzip_decompressor_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} + +/* + * Wrapper function to adjust the signature of palloc to match what libz + * expects. + */ +static void * +gzip_palloc(void *opaque, unsigned items, unsigned size) +{ + return palloc(items * size); +} + +/* + * Wrapper function to adjust the signature of pfree to match what libz + * expects. + */ +static void +gzip_pfree(void *opaque, void *address) +{ + pfree(address); +} +#endif diff --git a/src/fe_utils/astreamer_lz4.c b/src/fe_utils/astreamer_lz4.c new file mode 100644 index 00000000000..d8b2a367e47 --- /dev/null +++ b/src/fe_utils/astreamer_lz4.c @@ -0,0 +1,422 @@ +/*------------------------------------------------------------------------- + * + * astreamer_lz4.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_lz4.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <unistd.h> + +#ifdef USE_LZ4 +#include <lz4frame.h> +#endif + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +#ifdef USE_LZ4 +typedef struct astreamer_lz4_frame +{ + astreamer base; + + LZ4F_compressionContext_t cctx; + LZ4F_decompressionContext_t dctx; + LZ4F_preferences_t prefs; + + size_t bytes_written; + bool header_written; +} astreamer_lz4_frame; + +static void astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_compressor_finalize(astreamer *streamer); +static void astreamer_lz4_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_compressor_ops = { + .content = astreamer_lz4_compressor_content, + .finalize = astreamer_lz4_compressor_finalize, + .free = astreamer_lz4_compressor_free +}; + +static void astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_decompressor_finalize(astreamer *streamer); +static void astreamer_lz4_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_decompressor_ops = { + .content = astreamer_lz4_decompressor_content, + .finalize = astreamer_lz4_decompressor_finalize, + .free = astreamer_lz4_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs lz4 compression of tar + * blocks. + */ +astreamer * +astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + LZ4F_preferences_t *prefs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->header_written = false; + + /* Initialize stream compression preferences */ + prefs = &streamer->prefs; + memset(prefs, 0, sizeof(LZ4F_preferences_t)); + prefs->frameInfo.blockSizeID = LZ4F_max256KB; + prefs->compressionLevel = compress->level; + + ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_log_error("could not create lz4 compression context: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t out_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + + /* Write header before processing the first input chunk. */ + if (!mystreamer->header_written) + { + compressed_size = LZ4F_compressBegin(mystreamer->cctx, + (uint8 *) mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + &mystreamer->prefs); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not write lz4 header: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + mystreamer->header_written = true; + } + + /* + * Update the offset and capacity of output buffer based on number of + * bytes written to output buffer. + */ + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * Find out the compression bound and make sure that output buffer has the + * required capacity for the success of LZ4F_compressUpdate. If needed + * forward the content to next streamer and empty the buffer. + */ + out_bound = LZ4F_compressBound(len, &mystreamer->prefs); + if (avail_out < out_bound) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + context); + + /* Enlarge buffer if it falls short of out bound. */ + if (mystreamer->base.bbs_buffer.maxlen < out_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + + /* + * This call compresses the data starting at next_in and generates the + * output starting at next_out. It expects the caller to provide the size + * of input buffer and capacity of output buffer by providing parameters + * len and avail_out. + * + * It returns the number of bytes compressed to output buffer. + */ + compressed_size = LZ4F_compressUpdate(mystreamer->cctx, + next_out, avail_out, + next_in, len, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not compress data: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_compressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_out; + size_t footer_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* Find out the footer bound and update the output buffer. */ + footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); + if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < + footer_bound) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + /* Enlarge buffer if it falls short of footer bound. */ + if (mystreamer->base.bbs_buffer.maxlen < footer_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + } + + /* + * Finalize the frame and flush whatever data remaining in compression + * context. + */ + compressed_size = LZ4F_compressEnd(mystreamer->cctx, + next_out, avail_out, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not end lz4 compression: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_compressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeCompressionContext(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of lz4 + * compressed blocks. + */ +astreamer * +astreamer_lz4_decompressor_new(astreamer *next) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_fatal("could not initialize compression library: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t avail_in, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + avail_in = len; + avail_out = mystreamer->base.bbs_buffer.maxlen; + + while (avail_in > 0) + { + size_t ret, + read_size, + out_size; + + read_size = avail_in; + out_size = avail_out; + + /* + * This call decompresses the data starting at next_in and generates + * the output data starting at next_out. It expects the caller to + * provide size of the input buffer and total capacity of the output + * buffer by providing the read_size and out_size parameters + * respectively. + * + * Per the documentation of LZ4, parameters read_size and out_size + * behaves as dual parameters. On return, the number of bytes consumed + * from the input buffer will be written back to read_size and the + * number of bytes decompressed to output buffer will be written back + * to out_size respectively. + */ + ret = LZ4F_decompress(mystreamer->dctx, + next_out, &out_size, + next_in, &read_size, NULL); + + if (LZ4F_isError(ret)) + pg_log_error("could not decompress data: %s", + LZ4F_getErrorName(ret)); + + /* Update input buffer based on number of bytes consumed */ + avail_in -= read_size; + next_in += read_size; + + mystreamer->bytes_written += out_size; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + context); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + next_out += mystreamer->bytes_written; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_decompressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_decompressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeDecompressionContext(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c new file mode 100644 index 00000000000..f5d3562d280 --- /dev/null +++ b/src/fe_utils/astreamer_tar.c @@ -0,0 +1,514 @@ +/*------------------------------------------------------------------------- + * + * astreamer_tar.c + * + * This module implements three types of tar processing. A tar parser + * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits + * it into labelled chunks (any other value of astreamer_archive_context). + * A tar archiver does the reverse: it takes a bunch of labelled chunks + * and produces a tarfile, optionally replacing member headers and trailers + * so that upstream astreamer objects can perform surgery on the tarfile + * contents without knowing the details of the tar format. A tar terminator + * just adds two blocks of NUL bytes to the end of the file, since older + * server versions produce files with this terminator omitted. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_tar.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <time.h> + +#include "common/logging.h" +#include "fe_utils/astreamer.h" +#include "pgtar.h" + +typedef struct astreamer_tar_parser +{ + astreamer base; + astreamer_archive_context next_context; + astreamer_member member; + size_t file_bytes_sent; + size_t pad_bytes_expected; +} astreamer_tar_parser; + +typedef struct astreamer_tar_archiver +{ + astreamer base; + bool rearchive_member; +} astreamer_tar_archiver; + +static void astreamer_tar_parser_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_parser_finalize(astreamer *streamer); +static void astreamer_tar_parser_free(astreamer *streamer); +static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); + +static const astreamer_ops astreamer_tar_parser_ops = { + .content = astreamer_tar_parser_content, + .finalize = astreamer_tar_parser_finalize, + .free = astreamer_tar_parser_free +}; + +static void astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_archiver_finalize(astreamer *streamer); +static void astreamer_tar_archiver_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_archiver_ops = { + .content = astreamer_tar_archiver_content, + .finalize = astreamer_tar_archiver_finalize, + .free = astreamer_tar_archiver_free +}; + +static void astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_terminator_finalize(astreamer *streamer); +static void astreamer_tar_terminator_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_terminator_ops = { + .content = astreamer_tar_terminator_content, + .finalize = astreamer_tar_terminator_finalize, + .free = astreamer_tar_terminator_free +}; + +/* + * Create a astreamer that can parse a stream of content as tar data. + * + * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer + * specified by 'next' will receive a series of typed chunks, as per the + * conventions described in astreamer.h. + */ +astreamer * +astreamer_tar_parser_new(astreamer *next) +{ + astreamer_tar_parser *streamer; + + streamer = palloc0(sizeof(astreamer_tar_parser)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_parser_ops; + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->next_context = ASTREAMER_MEMBER_HEADER; + + return &streamer->base; +} + +/* + * Parse unknown content as tar data. + */ +static void +astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + size_t nbytes; + + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + while (len > 0) + { + switch (mystreamer->next_context) + { + case ASTREAMER_MEMBER_HEADER: + + /* + * If we're expecting an archive member header, accumulate a + * full block of data before doing anything further. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) + return; + + /* + * Now we can process the header and get ready to process the + * file contents; however, we might find out that what we + * thought was the next file header is actually the start of + * the archive trailer. Switch modes accordingly. + */ + if (astreamer_tar_header(mystreamer)) + { + if (mystreamer->member.size == 0) + { + /* No content; trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Expect contents. */ + mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; + } + mystreamer->base.bbs_buffer.len = 0; + mystreamer->file_bytes_sent = 0; + } + else + mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; + break; + + case ASTREAMER_MEMBER_CONTENTS: + + /* + * Send as much content as we have, but not more than the + * remaining file length. + */ + Assert(mystreamer->file_bytes_sent < mystreamer->member.size); + nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; + nbytes = Min(nbytes, len); + Assert(nbytes > 0); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + ASTREAMER_MEMBER_CONTENTS); + mystreamer->file_bytes_sent += nbytes; + data += nbytes; + len -= nbytes; + + /* + * If we've not yet sent the whole file, then there's more + * content to come; otherwise, it's time to expect the file + * trailer. + */ + Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); + if (mystreamer->file_bytes_sent == mystreamer->member.size) + { + if (mystreamer->pad_bytes_expected == 0) + { + /* Trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Trailer is not zero-length. */ + mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; + } + mystreamer->base.bbs_buffer.len = 0; + } + break; + + case ASTREAMER_MEMBER_TRAILER: + + /* + * If we're expecting an archive member trailer, accumulate + * the expected number of padding bytes before sending + * anything onward. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) + return; + + /* OK, now we can send it. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next file header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + mystreamer->base.bbs_buffer.len = 0; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + + /* + * We've seen an end-of-archive indicator, so anything more is + * buffered and sent as part of the archive trailer. But we + * don't expect more than 2 blocks. + */ + astreamer_buffer_bytes(streamer, &data, &len, len); + if (len > 2 * TAR_BLOCK_SIZE) + pg_fatal("tar file trailer exceeds 2 blocks"); + return; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar archive"); + } + } +} + +/* + * Parse a file header within a tar stream. + * + * The return value is true if we found a file header and passed it on to the + * next astreamer; it is false if we have reached the archive trailer. + */ +static bool +astreamer_tar_header(astreamer_tar_parser *mystreamer) +{ + bool has_nonzero_byte = false; + int i; + astreamer_member *member = &mystreamer->member; + char *buffer = mystreamer->base.bbs_buffer.data; + + Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); + + /* Check whether we've got a block of all zero bytes. */ + for (i = 0; i < TAR_BLOCK_SIZE; ++i) + { + if (buffer[i] != '\0') + { + has_nonzero_byte = true; + break; + } + } + + /* + * If the entire block was zeros, this is the end of the archive, not the + * start of the next file. + */ + if (!has_nonzero_byte) + return false; + + /* + * Parse key fields out of the header. + */ + strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); + if (member->pathname[0] == '\0') + pg_fatal("tar member has empty name"); + member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); + member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); + member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); + member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); + member->is_directory = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); + member->is_link = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); + if (member->is_link) + strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); + + /* Compute number of padding bytes. */ + mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); + + /* Forward the entire header to the next astreamer. */ + astreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + ASTREAMER_MEMBER_HEADER); + + return true; +} + +/* + * End-of-stream processing for a tar parser. + */ +static void +astreamer_tar_parser_finalize(astreamer *streamer) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + + if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || + mystreamer->base.bbs_buffer.len > 0)) + pg_fatal("COPY stream ended before last file was finished"); + + /* Send the archive trailer, even if empty. */ + astreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + ASTREAMER_ARCHIVE_TRAILER); + + /* Now finalize successor. */ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar parser. + */ +static void +astreamer_tar_parser_free(astreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + astreamer_free(streamer->bbs_next); +} + +/* + * Create a astreamer that can generate a tar archive. + * + * This is intended to be usable either for generating a brand-new tar archive + * or for modifying one on the fly. The input should be a series of typed + * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for + * astreamer_tar_parser_content. + */ +astreamer * +astreamer_tar_archiver_new(astreamer *next) +{ + astreamer_tar_archiver *streamer; + + streamer = palloc0(sizeof(astreamer_tar_archiver)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_archiver_ops; + streamer->base.bbs_next = next; + + return &streamer->base; +} + +/* + * Fix up the stream of input chunks to create a valid tar file. + * + * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is + * passed through without change. Any other size is a fatal error (and + * indicates a bug). + * + * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from + * scratch. Specifically, we construct a block of zero bytes sufficient to + * pad out to a block boundary, as required by the tar format. Other + * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. + * + * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * + * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * blocks of zero bytes. Not all tar programs require this, but apparently + * some do. The server does not supply this trailer. If no archive trailer is + * present, one will be added by astreamer_tar_parser_finalize. + */ +static void +astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; + char buffer[2 * TAR_BLOCK_SIZE]; + + Assert(context != ASTREAMER_UNKNOWN); + + if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + { + Assert(len == 0); + + /* Replace zero-length tar header with a newly constructed one. */ + tarCreateHeader(buffer, member->pathname, NULL, + member->size, member->mode, member->uid, member->gid, + time(NULL)); + data = buffer; + len = TAR_BLOCK_SIZE; + + /* Also make a note to replace padding, in case size changed. */ + mystreamer->rearchive_member = true; + } + else if (context == ASTREAMER_MEMBER_TRAILER && + mystreamer->rearchive_member) + { + int pad_bytes = tarPaddingBytesRequired(member->size); + + /* Also replace padding, if we regenerated the header. */ + memset(buffer, 0, pad_bytes); + data = buffer; + len = pad_bytes; + + /* Don't do this again unless we replace another header. */ + mystreamer->rearchive_member = false; + } + else if (context == ASTREAMER_ARCHIVE_TRAILER) + { + /* Trailer should always be two blocks of zero bytes. */ + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + data = buffer; + len = 2 * TAR_BLOCK_SIZE; + } + + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * End-of-stream processing for a tar archiver. + */ +static void +astreamer_tar_archiver_finalize(astreamer *streamer) +{ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar archiver. + */ +static void +astreamer_tar_archiver_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Create a astreamer that blindly adds two blocks of NUL bytes to the + * end of an incomplete tarfile that the server might send us. + */ +astreamer * +astreamer_tar_terminator_new(astreamer *next) +{ + astreamer *streamer; + + streamer = palloc0(sizeof(astreamer)); + *((const astreamer_ops **) &streamer->bbs_ops) = + &astreamer_tar_terminator_ops; + streamer->bbs_next = next; + + return streamer; +} + +/* + * Pass all the content through without change. + */ +static void +astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + /* Just forward it. */ + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * At the end, blindly add the two blocks of NUL bytes which the server fails + * to supply. + */ +static void +astreamer_tar_terminator_finalize(astreamer *streamer) +{ + char buffer[2 * TAR_BLOCK_SIZE]; + + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + astreamer_content(streamer->bbs_next, NULL, buffer, + 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar terminator. + */ +static void +astreamer_tar_terminator_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} diff --git a/src/fe_utils/astreamer_zstd.c b/src/fe_utils/astreamer_zstd.c new file mode 100644 index 00000000000..45f6cb67363 --- /dev/null +++ b/src/fe_utils/astreamer_zstd.c @@ -0,0 +1,368 @@ +/*------------------------------------------------------------------------- + * + * astreamer_zstd.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_zstd.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include <unistd.h> + +#ifdef USE_ZSTD +#include <zstd.h> +#endif + +#include "common/logging.h" +#include "fe_utils/astreamer.h" + +#ifdef USE_ZSTD + +typedef struct astreamer_zstd_frame +{ + astreamer base; + + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; + ZSTD_outBuffer zstd_outBuf; +} astreamer_zstd_frame; + +static void astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_compressor_finalize(astreamer *streamer); +static void astreamer_zstd_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_compressor_ops = { + .content = astreamer_zstd_compressor_content, + .finalize = astreamer_zstd_compressor_finalize, + .free = astreamer_zstd_compressor_free +}; + +static void astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_decompressor_finalize(astreamer *streamer); +static void astreamer_zstd_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_decompressor_ops = { + .content = astreamer_zstd_decompressor_content, + .finalize = astreamer_zstd_decompressor_finalize, + .free = astreamer_zstd_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs zstd compression of tar + * blocks. + */ +astreamer * +astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + size_t ret; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->cctx = ZSTD_createCCtx(); + if (!streamer->cctx) + pg_fatal("could not create zstd compression context"); + + /* Set compression level */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + pg_fatal("could not set zstd compression level to %d: %s", + compress->level, ZSTD_getErrorName(ret)); + + /* Set # of workers, if specified */ + if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0) + { + /* + * On older versions of libzstd, this option does not exist, and + * trying to set it will fail. Similarly for newer versions if they + * are compiled without threading support. + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + pg_fatal("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + } + + if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) + { + ret = ZSTD_CCtx_setParameter(streamer->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->long_distance); + if (ZSTD_isError(ret)) + { + pg_log_error("could not enable long-distance mode: %s", + ZSTD_getErrorName(ret)); + exit(1); + } + } + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = + ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_compressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t max_needed = ZSTD_compressBound(0); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, + &mystreamer->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next streamer. */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_compressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeCCtx(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of zstd + * compressed blocks. + */ +astreamer * +astreamer_zstd_decompressor_new(astreamer *next) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->dctx = ZSTD_createDCtx(); + if (!streamer->dctx) + pg_fatal("could not create zstd decompression context"); + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t ret; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + ret = ZSTD_decompressStream(mystreamer->dctx, + &mystreamer->zstd_outBuf, &inBuf); + + if (ZSTD_isError(ret)) + pg_log_error("could not decompress data: %s", + ZSTD_getErrorName(ret)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_decompressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_decompressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeDCtx(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/fe_utils/meson.build b/src/fe_utils/meson.build index 14d0482a2cc..043021d826d 100644 --- a/src/fe_utils/meson.build +++ b/src/fe_utils/meson.build @@ -2,6 +2,11 @@ fe_utils_sources = files( 'archive.c', + 'astreamer_file.c', + 'astreamer_gzip.c', + 'astreamer_lz4.c', + 'astreamer_tar.c', + 'astreamer_zstd.c', 'cancel.c', 'conditional.c', 'connect_utils.c', |