<listitem>
<para>
Enables compression of write-ahead logs using the specified method.
- Supported values <literal>gzip</literal>, and
- <literal>none</literal>.
+ Supported values <literal>gzip</literal>, <literal>lz4</literal>
+ (if <productname>PostgreSQL</productname> was compiled with
+ <option>--with-lz4</option>), and <literal>none</literal>.
</para>
<para>
The suffix <filename>.gz</filename> will automatically be added to
- all filenames when using <literal>gzip</literal>
+ all filenames when using <literal>gzip</literal>, and the suffix
+ <filename>.lz4</filename> is added when using <literal>lz4</literal>.
</para>
</listitem>
</varlistentry>
GZIP = gzip
BZIP2 = bzip2
+LZ4 = lz4
DOWNLOAD = wget -O $@ --no-use-server-timestamps
#DOWNLOAD = curl -o $@
include $(top_builddir)/src/Makefile.global
# make these available to TAP test scripts
+export LZ4
export TAR
# Note that GZIP cannot be used directly as this environment variable is
# used by the command "gzip" to pass down options, so stick with a different
#include "receivelog.h"
#include "streamutil.h"
+#ifdef HAVE_LIBLZ4
+#include "lz4frame.h"
+#endif
+
/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5
return true;
}
+ /* File looks like a completed LZ4-compressed WAL file */
+ if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
+ strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
+ {
+ *ispartial = false;
+ *wal_compression_method = COMPRESSION_LZ4;
+ return true;
+ }
+
/* File looks like a partial uncompressed WAL file */
if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
return true;
}
+ /* File looks like a partial LZ4-compressed WAL file */
+ if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
+ strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
+ {
+ *ispartial = true;
+ *wal_compression_method = COMPRESSION_LZ4;
+ return true;
+ }
+
/* File does not look like something we know */
return false;
}
/*
* Check that the segment has the right size, if it's supposed to be
* completed. For non-compressed segments just check the on-disk size
- * and see if it matches a completed segment. For gzip-compressed
+ * and see if it matches a completed segment. For gzip-compressed
* segments, look at the last 4 bytes of the compressed file, which is
* where the uncompressed size is located for files with a size lower
* than 4GB, and then compare it to the size of a completed segment.
* The 4 last bytes correspond to the ISIZE member according to
* http://www.zlib.org/rfc-gzip.html.
+ *
+ * For LZ4-compressed segments, uncompress the file in a throw-away
+ * buffer keeping track of the uncompressed size, then compare it to
+ * the size of a completed segment. Per its protocol, LZ4 does not
+ * store the uncompressed size of an object by default. contentSize
+ * is one possible way to do that, but we need to rely on a method
+ * where WAL segments could have been compressed by a different source
+ * than pg_receivewal, like an archive_command with lz4.
*/
if (!ispartial && wal_compression_method == COMPRESSION_NONE)
{
continue;
}
}
+ else if (!ispartial && wal_compression_method == COMPRESSION_LZ4)
+ {
+#ifdef HAVE_LIBLZ4
+#define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */
+ int fd;
+ ssize_t r;
+ size_t uncompressed_size = 0;
+ char fullpath[MAXPGPATH * 2];
+ char *outbuf;
+ char *readbuf;
+ LZ4F_decompressionContext_t ctx = NULL;
+ LZ4F_decompressOptions_t dec_opt;
+ LZ4F_errorCode_t status;
+
+ memset(&dec_opt, 0, sizeof(dec_opt));
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+ fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
+ if (fd < 0)
+ {
+ pg_log_error("could not open file \"%s\": %m", fullpath);
+ exit(1);
+ }
+
+ status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
+ if (LZ4F_isError(status))
+ {
+ pg_log_error("could not create LZ4 decompression context: %s",
+ LZ4F_getErrorName(status));
+ exit(1);
+ }
+
+ outbuf = pg_malloc0(LZ4_CHUNK_SZ);
+ readbuf = pg_malloc0(LZ4_CHUNK_SZ);
+ do
+ {
+ char *readp;
+ char *readend;
+
+ r = read(fd, readbuf, LZ4_CHUNK_SZ);
+ if (r < 0)
+ {
+ pg_log_error("could not read file \"%s\": %m", fullpath);
+ exit(1);
+ }
+
+ /* Done reading the file */
+ if (r == 0)
+ break;
+
+ /* Process one chunk */
+ readp = readbuf;
+ readend = readbuf + r;
+ while (readp < readend)
+ {
+ size_t out_size = LZ4_CHUNK_SZ;
+ size_t read_size = readend - readp;
+
+ memset(outbuf, 0, LZ4_CHUNK_SZ);
+ status = LZ4F_decompress(ctx, outbuf, &out_size,
+ readp, &read_size, &dec_opt);
+ if (LZ4F_isError(status))
+ {
+ pg_log_error("could not decompress file \"%s\": %s",
+ fullpath,
+ LZ4F_getErrorName(status));
+ exit(1);
+ }
+
+ readp += read_size;
+ uncompressed_size += out_size;
+ }
+
+ /*
+ * No need to continue reading the file when the
+ * uncompressed_size exceeds WalSegSz, even if there are still
+ * data left to read. However, if uncompressed_size is equal
+ * to WalSegSz, it should verify that there is no more data to
+ * read.
+ */
+ } while (uncompressed_size <= WalSegSz && r > 0);
+
+ close(fd);
+ pg_free(outbuf);
+ pg_free(readbuf);
+
+ status = LZ4F_freeDecompressionContext(ctx);
+ if (LZ4F_isError(status))
+ {
+ pg_log_error("could not free LZ4 decompression context: %s",
+ LZ4F_getErrorName(status));
+ exit(1);
+ }
+
+ if (uncompressed_size != WalSegSz)
+ {
+ pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %ld, skipping",
+ dirent->d_name, uncompressed_size);
+ continue;
+ }
+#else
+ pg_log_error("could not check file \"%s\"",
+ dirent->d_name);
+ pg_log_error("this build does not support compression with %s",
+ "LZ4");
+ exit(1);
+#endif
+ }
/* Looks like a valid segment. Remember that we saw it. */
if ((segno > high_segno) ||
case 6:
if (pg_strcasecmp(optarg, "gzip") == 0)
compression_method = COMPRESSION_GZIP;
+ else if (pg_strcasecmp(optarg, "lz4") == 0)
+ compression_method = COMPRESSION_LZ4;
else if (pg_strcasecmp(optarg, "none") == 0)
compression_method = COMPRESSION_NONE;
else
pg_log_error("this build does not support compression with %s",
"gzip");
exit(1);
+#endif
+ break;
+ case COMPRESSION_LZ4:
+#ifdef HAVE_LIBLZ4
+ if (compresslevel != 0)
+ {
+ pg_log_error("cannot use --compress with --compression-method=%s",
+ "lz4");
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+#else
+ pg_log_error("this build does not support compression with %s",
+ "LZ4");
+ exit(1);
#endif
break;
}
use warnings;
use PostgreSQL::Test::Utils;
use PostgreSQL::Test::Cluster;
-use Test::More tests => 37;
+use Test::More tests => 42;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
"gzip verified the integrity of compressed WAL segments");
}
+# Check LZ4 compression if available
+SKIP:
+{
+ skip "postgres was not built with LZ4 support", 5
+ if (!check_pg_config("#define HAVE_LIBLZ4 1"));
+
+ # Generate more WAL including one completed, compressed segment.
+ $primary->psql('postgres', 'SELECT pg_switch_wal();');
+ $nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+ chomp($nextlsn);
+ $primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
+
+ # Stream up to the given position.
+ $primary->command_ok(
+ [
+ 'pg_receivewal', '-D',
+ $stream_dir, '--verbose',
+ '--endpos', $nextlsn,
+ '--no-loop', '--compression-method',
+ 'lz4'
+ ],
+ 'streaming some WAL using --compression-method=lz4');
+
+ # Verify that the stored files are generated with their expected
+ # names.
+ my @lz4_wals = glob "$stream_dir/*.lz4";
+ is(scalar(@lz4_wals), 1,
+ "one WAL segment compressed with LZ4 was created");
+ my @lz4_partial_wals = glob "$stream_dir/*.lz4.partial";
+ is(scalar(@lz4_partial_wals),
+ 1, "one partial WAL segment compressed with LZ4 was created");
+
+ # Verify that the start streaming position is computed correctly by
+ # comparing it with the partial file generated previously. The name
+ # of the previous partial, now-completed WAL segment is updated, keeping
+ # its base number.
+ $partial_wals[0] =~ s/(\.gz)?\.partial$/.lz4/;
+ is($lz4_wals[0] eq $partial_wals[0],
+ 1, "one partial WAL segment is now completed");
+ # Update the list of partial wals with the current one.
+ @partial_wals = @lz4_partial_wals;
+
+ # Check the integrity of the completed segment, if LZ4 is an available
+ # command.
+ my $lz4 = $ENV{LZ4};
+ skip "program lz4 is not found in your system", 1
+ if ( !defined $lz4
+ || $lz4 eq ''
+ || system_log($lz4, '--version') != 0);
+
+ my $lz4_is_valid = system_log($lz4, '-t', @lz4_wals);
+ is($lz4_is_valid, 0,
+ "lz4 verified the integrity of compressed WAL segments");
+}
+
# Verify that the start streaming position is computed and that the value is
-# correct regardless of whether ZLIB is available.
+# correct regardless of whether any compression is available.
$primary->psql('postgres', 'SELECT pg_switch_wal();');
$nextlsn =
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
chomp($nextlsn);
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
$primary->command_ok(
[
'pg_receivewal', '-D', $stream_dir, '--verbose',
],
"streaming some WAL");
-$partial_wals[0] =~ s/(\.gz)?.partial//;
+$partial_wals[0] =~ s/(\.gz|\.lz4)?.partial//;
ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
# Permissions on WAL files should be default
# Switch to a new segment, to make sure that the segment retained by the
# slot is still streamed. This may not be necessary, but play it safe.
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
$primary->psql('postgres', 'SELECT pg_switch_wal();');
$nextlsn =
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
# Add a bit more data to accelerate the end of the next pg_receivewal
# commands.
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (6);');
# Check case where the slot does not exist.
$primary->command_fails_like(
# on the new timeline.
my $walfile_after_promotion = $standby->safe_psql('postgres',
"SELECT pg_walfile_name(pg_current_wal_insert_lsn());");
-$standby->psql('postgres', 'INSERT INTO test_table VALUES (6);');
+$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
$standby->psql('postgres', 'SELECT pg_switch_wal();');
$nextlsn =
$standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
chomp($nextlsn);
# This speeds up the operation.
-$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
+$standby->psql('postgres', 'INSERT INTO test_table VALUES (8);');
# Now try to resume from the slot after the promotion.
my $timeline_dir = $primary->basedir . '/timeline_wal';
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
+/* Size of LZ4 input chunk for .lz4 */
+#define LZ4_IN_SIZE 4096
+
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_wal
*-------------------------------------------------------------------------
#ifdef HAVE_LIBZ
gzFile gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx;
+ size_t lz4bufsize;
+ void *lz4buf;
+#endif
} DirectoryMethodFile;
static const char *
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "",
+ dir_data->compression_method == COMPRESSION_GZIP ? ".gz" :
+ dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
#endif
+#ifdef HAVE_LIBLZ4
+ LZ4F_compressionContext_t ctx = NULL;
+ size_t lz4bufsize = 0;
+ void *lz4buf = NULL;
+#endif
filename = dir_get_file_name(pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
}
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t ctx_out;
+ size_t header_size;
+
+ ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+ if (LZ4F_isError(ctx_out))
+ {
+ close(fd);
+ return NULL;
+ }
+
+ lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
+ lz4buf = pg_malloc0(lz4bufsize);
+
+ /* add the header */
+ header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
+ if (LZ4F_isError(header_size))
+ {
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ return NULL;
+ }
+
+ errno = 0;
+ if (write(fd, lz4buf, header_size) != header_size)
+ {
+ int save_errno = errno;
+
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+
+ /*
+ * If write didn't set errno, assume problem is no disk space.
+ */
+ errno = save_errno ? save_errno : ENOSPC;
+ return NULL;
+ }
+ }
+#endif
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
if (dir_data->compression_method == COMPRESSION_GZIP)
gzclose(gzfp);
else
+#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+ (void) LZ4F_freeCompressionContext(ctx);
+ pg_free(lz4buf);
+ close(fd);
+ }
+ else
#endif
close(fd);
return NULL;
if (dir_data->compression_method == COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ f->ctx = ctx;
+ f->lz4buf = lz4buf;
+ f->lz4bufsize = lz4bufsize;
+ }
+#endif
+
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
if (dir_data->compression_method == COMPRESSION_GZIP)
r = (ssize_t) gzwrite(df->gzfp, buf, count);
else
+#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t chunk;
+ size_t remaining;
+ const void *inbuf = buf;
+
+ remaining = count;
+ while (remaining > 0)
+ {
+ size_t compressed;
+
+ if (remaining > LZ4_IN_SIZE)
+ chunk = LZ4_IN_SIZE;
+ else
+ chunk = remaining;
+
+ remaining -= chunk;
+ compressed = LZ4F_compressUpdate(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ inbuf, chunk,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ inbuf = ((char *) inbuf) + chunk;
+ }
+
+ /* Our caller keeps track of the uncompressed size. */
+ r = (ssize_t) count;
+ }
+ else
#endif
r = write(df->fd, buf, count);
if (r > 0)
if (dir_data->compression_method == COMPRESSION_GZIP)
r = gzclose(df->gzfp);
else
+#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ size_t compressed;
+
+ compressed = LZ4F_compressEnd(df->ctx,
+ df->lz4buf, df->lz4bufsize,
+ NULL);
+
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+
+ r = close(df->fd);
+ }
+ else
#endif
r = close(df->fd);
}
}
+#ifdef HAVE_LIBLZ4
+ pg_free(df->lz4buf);
+ /* supports free on NULL */
+ LZ4F_freeCompressionContext(df->ctx);
+#endif
+
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
return -1;
}
#endif
+#ifdef HAVE_LIBLZ4
+ if (dir_data->compression_method == COMPRESSION_LZ4)
+ {
+ DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ size_t compressed;
+
+ /* Flush any internal buffers */
+ compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
+ if (LZ4F_isError(compressed))
+ return -1;
+
+ if (write(df->fd, df->lz4buf, compressed) != compressed)
+ return -1;
+ }
+#endif
return fsync(((DirectoryMethodFile *) f)->fd);
}
typedef enum
{
COMPRESSION_GZIP,
+ COMPRESSION_LZ4,
COMPRESSION_NONE
} WalCompressionMethod;