#include "access/xlog.h"
#include "access/xlog_internal.h"
+#include "lib/binaryheap.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/proc.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
+#include "storage/spin.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
*/
#define NUM_ORPHAN_CLEANUP_RETRIES 3
+/*
+ * Maximum number of .ready files to gather per directory scan.
+ */
+#define NUM_FILES_PER_DIRECTORY_SCAN 64
+
/* Shared memory area for archiver process */
typedef struct PgArchData
{
int pgprocno; /* pgprocno of archiver process */
+
+ /*
+ * Forces a directory scan in pgarch_readyXlog(). Protected by
+ * arch_lck.
+ */
+ bool force_dir_scan;
+
+ slock_t arch_lck;
} PgArchData;
static time_t last_sigterm_time = 0;
static PgArchData *PgArch = NULL;
+/*
+ * Stuff for tracking multiple files to archive from each scan of
+ * archive_status. Minimizing the number of directory scans when there are
+ * many files to archive can significantly improve archival rate.
+ *
+ * arch_heap is a max-heap that is used during the directory scan to track
+ * the highest-priority files to archive. After the directory scan
+ * completes, the file names are stored in ascending order of priority in
+ * arch_files. pgarch_readyXlog() returns files from arch_files until it
+ * is empty, at which point another directory scan must be performed.
+ */
+static binaryheap *arch_heap = NULL;
+static char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS];
+static char *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
+static int arch_files_size = 0;
+
/*
* Flags set by interrupt handlers for later service in the main loop.
*/
static void pgarch_archiveDone(char *xlog);
static void pgarch_die(int code, Datum arg);
static void HandlePgArchInterrupts(void);
+static int ready_file_comparator(Datum a, Datum b, void *arg);
/* Report shared memory space needed by PgArchShmemInit */
Size
/* First time through, so initialize */
MemSet(PgArch, 0, PgArchShmemSize());
PgArch->pgprocno = INVALID_PGPROCNO;
+ SpinLockInit(&PgArch->arch_lck);
}
}
*/
PgArch->pgprocno = MyProc->pgprocno;
+ /* Initialize our max-heap for prioritizing files to archive. */
+ arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
+ ready_file_comparator, NULL);
+
pgarch_MainLoop();
proc_exit(0);
{
char xlog[MAX_XFN_CHARS + 1];
+ /* force directory scan in the first call to pgarch_readyXlog() */
+ arch_files_size = 0;
+
/*
* loop through all xlogs with archive_status of .ready and archive
* them...mostly we expect this to be a single file, though it is possible
static bool
pgarch_readyXlog(char *xlog)
{
- /*
- * open xlog status directory and read through list of xlogs that have the
- * .ready suffix, looking for earliest file. It is possible to optimise
- * this code, though only a single file is expected on the vast majority
- * of calls, so....
- */
char XLogArchiveStatusDir[MAXPGPATH];
DIR *rldir;
struct dirent *rlde;
- bool found = false;
- bool historyFound = false;
+ bool force_dir_scan;
+ /*
+ * If a directory scan was requested, clear the stored file names and
+ * proceed.
+ */
+ SpinLockAcquire(&PgArch->arch_lck);
+ force_dir_scan = PgArch->force_dir_scan;
+ PgArch->force_dir_scan = false;
+ SpinLockRelease(&PgArch->arch_lck);
+
+ if (force_dir_scan)
+ arch_files_size = 0;
+
+ /*
+ * If we still have stored file names from the previous directory scan,
+ * try to return one of those. We check to make sure the status file
+ * is still present, as the archive_command for a previous file may
+ * have already marked it done.
+ */
+ while (arch_files_size > 0)
+ {
+ struct stat st;
+ char status_file[MAXPGPATH];
+ char *arch_file;
+
+ arch_files_size--;
+ arch_file = arch_files[arch_files_size];
+ StatusFilePath(status_file, arch_file, ".ready");
+
+ if (stat(status_file, &st) == 0)
+ {
+ strcpy(xlog, arch_file);
+ return true;
+ }
+ else if (errno != ENOENT)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m", status_file)));
+ }
+
+ /*
+ * Open the archive status directory and read through the list of files
+ * with the .ready suffix, looking for the earliest files.
+ */
snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
rldir = AllocateDir(XLogArchiveStatusDir);
{
int basenamelen = (int) strlen(rlde->d_name) - 6;
char basename[MAX_XFN_CHARS + 1];
- bool ishistory;
+ char *arch_file;
/* Ignore entries with unexpected number of characters */
if (basenamelen < MIN_XFN_CHARS ||
memcpy(basename, rlde->d_name, basenamelen);
basename[basenamelen] = '\0';
- /* Is this a history file? */
- ishistory = IsTLHistoryFileName(basename);
-
/*
- * Consume the file to archive. History files have the highest
- * priority. If this is the first file or the first history file
- * ever, copy it. In the presence of a history file already chosen as
- * target, ignore all other files except history files which have been
- * generated for an older timeline than what is already chosen as
- * target to archive.
+ * Store the file in our max-heap if it has a high enough priority.
*/
- if (!found || (ishistory && !historyFound))
+ if (arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
{
- strcpy(xlog, basename);
- found = true;
- historyFound = ishistory;
+ /* If the heap isn't full yet, quickly add it. */
+ arch_file = arch_filenames[arch_heap->bh_size];
+ strcpy(arch_file, basename);
+ binaryheap_add_unordered(arch_heap, CStringGetDatum(arch_file));
+
+ /* If we just filled the heap, make it a valid one. */
+ if (arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN)
+ binaryheap_build(arch_heap);
}
- else if (ishistory || !historyFound)
+ else if (ready_file_comparator(binaryheap_first(arch_heap),
+ CStringGetDatum(basename), NULL) > 0)
{
- if (strcmp(basename, xlog) < 0)
- strcpy(xlog, basename);
+ /*
+ * Remove the lowest priority file and add the current one to
+ * the heap.
+ */
+ arch_file = DatumGetCString(binaryheap_remove_first(arch_heap));
+ strcpy(arch_file, basename);
+ binaryheap_add(arch_heap, CStringGetDatum(arch_file));
}
}
FreeDir(rldir);
- return found;
+ /* If no files were found, simply return. */
+ if (arch_heap->bh_size == 0)
+ return false;
+
+ /*
+ * If we didn't fill the heap, we didn't make it a valid one. Do that
+ * now.
+ */
+ if (arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
+ binaryheap_build(arch_heap);
+
+ /*
+ * Fill arch_files array with the files to archive in ascending order
+ * of priority.
+ */
+ arch_files_size = arch_heap->bh_size;
+ for (int i = 0; i < arch_files_size; i++)
+ arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_heap));
+
+ /* Return the highest priority file. */
+ arch_files_size--;
+ strcpy(xlog, arch_files[arch_files_size]);
+
+ return true;
+}
+
+/*
+ * ready_file_comparator
+ *
+ * Compares the archival priority of the given files to archive. If "a"
+ * has a higher priority than "b", a negative value will be returned. If
+ * "b" has a higher priority than "a", a positive value will be returned.
+ * If "a" and "b" have equivalent values, 0 will be returned.
+ */
+static int
+ready_file_comparator(Datum a, Datum b, void *arg)
+{
+ char *a_str = DatumGetCString(a);
+ char *b_str = DatumGetCString(b);
+ bool a_history = IsTLHistoryFileName(a_str);
+ bool b_history = IsTLHistoryFileName(b_str);
+
+ /* Timeline history files always have the highest priority. */
+ if (a_history != b_history)
+ return a_history ? -1 : 1;
+
+ /* Priority is given to older files. */
+ return strcmp(a_str, b_str);
+}
+
+/*
+ * PgArchForceDirScan
+ *
+ * When called, the next call to pgarch_readyXlog() will perform a
+ * directory scan. This is useful for ensuring that important files such
+ * as timeline history files are archived as quickly as possible.
+ */
+void
+PgArchForceDirScan(void)
+{
+ SpinLockAcquire(&PgArch->arch_lck);
+ PgArch->force_dir_scan = true;
+ SpinLockRelease(&PgArch->arch_lck);
}
/*