Convert pg_restore's ready_list to a priority queue.
authorNathan Bossart <nathan@postgresql.org>
Tue, 19 Sep 2023 21:31:29 +0000 (14:31 -0700)
committerNathan Bossart <nathan@postgresql.org>
Tue, 19 Sep 2023 21:31:29 +0000 (14:31 -0700)
Presently, parallel restores spend a lot of time sorting this list
so that we pick the largest items first.  With many tables, this
sorting can become a significant bottleneck.  There are a couple of
reports from the field about this, and it is easy to reproduce.

This commit improves the performance of parallel pg_restore with
many tables by converting its ready_list to a priority queue, i.e.,
a binary heap.  We will first try to run the highest priority item,
but if it cannot be chosen due to the lock heuristic, we'll do a
sequential scan through the heap nodes until we find one that is
runnable.  This means that we might end up picking an item with a
much lower priority.  However, we expect that we will typically be
able to pick one of the first few items, which should usually have
a relatively high priority.

Suggested-by: Tom Lane
Tested-by: Pierre Ducroquet
Reviewed-by: Tom Lane
Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us

src/bin/pg_dump/pg_backup_archiver.c

index 4d83381d84082a87baf563113233e6a2d40a6120..cc06f1c81773f6bd68853edf6ce2ac67093457e9 100644 (file)
@@ -34,6 +34,7 @@
 #include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
+#include "lib/binaryheap.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/*
- * State for tracking TocEntrys that are ready to process during a parallel
- * restore.  (This used to be a list, and we still call it that, though now
- * it's really an array so that we can apply qsort to it.)
- *
- * tes[] is sized large enough that we can't overrun it.
- * The valid entries are indexed first_te .. last_te inclusive.
- * We periodically sort the array to bring larger-by-dataLength entries to
- * the front; "sorted" is true if the valid entries are known sorted.
- */
-typedef struct _parallelReadyList
-{
-       TocEntry  **tes;                        /* Ready-to-dump TocEntrys */
-       int                     first_te;               /* index of first valid entry in tes[] */
-       int                     last_te;                /* index of last valid entry in tes[] */
-       bool            sorted;                 /* are valid entries currently sorted? */
-} ParallelReadyList;
-
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
                                                           const pg_compress_specification compression_spec,
@@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
 static void pending_list_header_init(TocEntry *l);
 static void pending_list_append(TocEntry *l, TocEntry *te);
 static void pending_list_remove(TocEntry *te);
-static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
-static void ready_list_free(ParallelReadyList *ready_list);
-static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
-static void ready_list_remove(ParallelReadyList *ready_list, int i);
-static void ready_list_sort(ParallelReadyList *ready_list);
-static int     TocEntrySizeCompare(const void *p1, const void *p2);
-static void move_to_ready_list(TocEntry *pending_list,
-                                                          ParallelReadyList *ready_list,
+static int     TocEntrySizeCompareQsort(const void *p1, const void *p2);
+static int     TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
+static void move_to_ready_heap(TocEntry *pending_list,
+                                                          binaryheap *ready_heap,
                                                           RestorePass pass);
-static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+static TocEntry *pop_next_work_item(binaryheap *ready_heap,
                                                                        ParallelState *pstate);
 static void mark_dump_job_done(ArchiveHandle *AH,
                                                           TocEntry *te,
@@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-                                                               ParallelReadyList *ready_list);
+                                                               binaryheap *ready_heap);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 
@@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
                }
 
                if (ntes > 1)
-                       qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
+                       qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
 
                for (int i = 0; i < ntes; i++)
                        DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
@@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 
                        (void) restore_toc_entry(AH, next_work_item, false);
 
-                       /* Reduce dependencies, but don't move anything to ready_list */
+                       /* Reduce dependencies, but don't move anything to ready_heap */
                        reduce_dependencies(AH, next_work_item, NULL);
                }
                else
@@ -4027,24 +4006,26 @@ static void
 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                                                         TocEntry *pending_list)
 {
-       ParallelReadyList ready_list;
+       binaryheap *ready_heap;
        TocEntry   *next_work_item;
 
        pg_log_debug("entering restore_toc_entries_parallel");
 
-       /* Set up ready_list with enough room for all known TocEntrys */
-       ready_list_init(&ready_list, AH->tocCount);
+       /* Set up ready_heap with enough room for all known TocEntrys */
+       ready_heap = binaryheap_allocate(AH->tocCount,
+                                                                        TocEntrySizeCompareBinaryheap,
+                                                                        NULL);
 
        /*
         * The pending_list contains all items that we need to restore.  Move all
-        * items that are available to process immediately into the ready_list.
+        * items that are available to process immediately into the ready_heap.
         * After this setup, the pending list is everything that needs to be done
-        * but is blocked by one or more dependencies, while the ready list
+        * but is blocked by one or more dependencies, while the ready heap
         * contains items that have no remaining dependencies and are OK to
         * process in the current restore pass.
         */
        AH->restorePass = RESTORE_PASS_MAIN;
-       move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+       move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 
        /*
         * main parent loop
@@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
        for (;;)
        {
                /* Look for an item ready to be dispatched to a worker */
-               next_work_item = pop_next_work_item(&ready_list, pstate);
+               next_work_item = pop_next_work_item(ready_heap, pstate);
                if (next_work_item != NULL)
                {
                        /* If not to be restored, don't waste time launching a worker */
@@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                                                        next_work_item->dumpId,
                                                        next_work_item->desc, next_work_item->tag);
                                /* Update its dependencies as though we'd completed it */
-                               reduce_dependencies(AH, next_work_item, &ready_list);
+                               reduce_dependencies(AH, next_work_item, ready_heap);
                                /* Loop around to see if anything else can be dispatched */
                                continue;
                        }
@@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 
                        /* Dispatch to some worker */
                        DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
-                                                                  mark_restore_job_done, &ready_list);
+                                                                  mark_restore_job_done, ready_heap);
                }
                else if (IsEveryWorkerIdle(pstate))
                {
@@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                        /* Advance to next restore pass */
                        AH->restorePass++;
                        /* That probably allows some stuff to be made ready */
-                       move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+                       move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
                        /* Loop around to see if anything's now ready */
                        continue;
                }
@@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
                                           next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
        }
 
-       /* There should now be nothing in ready_list. */
-       Assert(ready_list.first_te > ready_list.last_te);
+       /* There should now be nothing in ready_heap. */
+       Assert(binaryheap_empty(ready_heap));
 
-       ready_list_free(&ready_list);
+       binaryheap_free(ready_heap);
 
        pg_log_info("finished main parallel loop");
 }
@@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te)
 }
 
 
-/*
- * Initialize the ready_list with enough room for up to tocCount entries.
- */
-static void
-ready_list_init(ParallelReadyList *ready_list, int tocCount)
-{
-       ready_list->tes = (TocEntry **)
-               pg_malloc(tocCount * sizeof(TocEntry *));
-       ready_list->first_te = 0;
-       ready_list->last_te = -1;
-       ready_list->sorted = false;
-}
-
-/*
- * Free storage for a ready_list.
- */
-static void
-ready_list_free(ParallelReadyList *ready_list)
-{
-       pg_free(ready_list->tes);
-}
-
-/* Add te to the ready_list */
-static void
-ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
-{
-       ready_list->tes[++ready_list->last_te] = te;
-       /* List is (probably) not sorted anymore. */
-       ready_list->sorted = false;
-}
-
-/* Remove the i'th entry in the ready_list */
-static void
-ready_list_remove(ParallelReadyList *ready_list, int i)
-{
-       int                     f = ready_list->first_te;
-
-       Assert(i >= f && i <= ready_list->last_te);
-
-       /*
-        * In the typical case where the item to be removed is the first ready
-        * entry, we need only increment first_te to remove it.  Otherwise, move
-        * the entries before it to compact the list.  (This preserves sortedness,
-        * if any.)  We could alternatively move the entries after i, but there
-        * are typically many more of those.
-        */
-       if (i > f)
-       {
-               TocEntry  **first_te_ptr = &ready_list->tes[f];
-
-               memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
-       }
-       ready_list->first_te++;
-}
-
-/* Sort the ready_list into the desired order */
-static void
-ready_list_sort(ParallelReadyList *ready_list)
-{
-       if (!ready_list->sorted)
-       {
-               int                     n = ready_list->last_te - ready_list->first_te + 1;
-
-               if (n > 1)
-                       qsort(ready_list->tes + ready_list->first_te, n,
-                                 sizeof(TocEntry *),
-                                 TocEntrySizeCompare);
-               ready_list->sorted = true;
-       }
-}
-
 /* qsort comparator for sorting TocEntries by dataLength */
 static int
-TocEntrySizeCompare(const void *p1, const void *p2)
+TocEntrySizeCompareQsort(const void *p1, const void *p2)
 {
        const TocEntry *te1 = *(const TocEntry *const *) p1;
        const TocEntry *te2 = *(const TocEntry *const *) p2;
@@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2)
        return 0;
 }
 
+/* binaryheap comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
+{
+       /* return opposite of qsort comparator for max-heap */
+       return -TocEntrySizeCompareQsort(&p1, &p2);
+}
+
 
 /*
- * Move all immediately-ready items from pending_list to ready_list.
+ * Move all immediately-ready items from pending_list to ready_heap.
  *
  * Items are considered ready if they have no remaining dependencies and
  * they belong in the current restore pass.  (See also reduce_dependencies,
  * which applies the same logic one-at-a-time.)
  */
 static void
-move_to_ready_list(TocEntry *pending_list,
-                                  ParallelReadyList *ready_list,
+move_to_ready_heap(TocEntry *pending_list,
+                                  binaryheap *ready_heap,
                                   RestorePass pass)
 {
        TocEntry   *te;
@@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list,
                {
                        /* Remove it from pending_list ... */
                        pending_list_remove(te);
-                       /* ... and add to ready_list */
-                       ready_list_insert(ready_list, te);
+                       /* ... and add to ready_heap */
+                       binaryheap_add(ready_heap, te);
                }
        }
 }
 
 /*
  * Find the next work item (if any) that is capable of being run now,
- * and remove it from the ready_list.
+ * and remove it from the ready_heap.
  *
  * Returns the item, or NULL if nothing is runnable.
  *
  * To qualify, the item must have no remaining dependencies
  * and no requirements for locks that are incompatible with
- * items currently running.  Items in the ready_list are known to have
+ * items currently running.  Items in the ready_heap are known to have
  * no remaining dependencies, but we have to check for lock conflicts.
  */
 static TocEntry *
-pop_next_work_item(ParallelReadyList *ready_list,
+pop_next_work_item(binaryheap *ready_heap,
                                   ParallelState *pstate)
 {
        /*
-        * Sort the ready_list so that we'll tackle larger jobs first.
-        */
-       ready_list_sort(ready_list);
-
-       /*
-        * Search the ready_list until we find a suitable item.
+        * Search the ready_heap until we find a suitable item.  Note that we do a
+        * sequential scan through the heap nodes, so even though we will first
+        * try to choose the highest-priority item, we might end up picking
+        * something with a much lower priority.  However, we expect that we will
+        * typically be able to pick one of the first few items, which should
+        * usually have a relatively high priority.
         */
-       for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+       for (int i = 0; i < binaryheap_size(ready_heap); i++)
        {
-               TocEntry   *te = ready_list->tes[i];
+               TocEntry   *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
                bool            conflicts = false;
 
                /*
@@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
                        continue;
 
                /* passed all tests, so this item can run */
-               ready_list_remove(ready_list, i);
+               binaryheap_remove_node(ready_heap, i);
                return te;
        }
 
@@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH,
                                          int status,
                                          void *callback_data)
 {
-       ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+       binaryheap *ready_heap = (binaryheap *) callback_data;
 
        pg_log_info("finished item %d %s %s",
                                te->dumpId, te->desc, te->tag);
@@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH,
                pg_fatal("worker process failed: exit code %d",
                                 status);
 
-       reduce_dependencies(AH, te, ready_list);
+       reduce_dependencies(AH, te, ready_heap);
 }
 
 
@@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
  * it, thereby possibly making them ready-to-run.  Any pending item that
- * becomes ready should be moved to the ready_list, if that's provided.
+ * becomes ready should be moved to the ready_heap, if that's provided.
  */
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-                                       ParallelReadyList *ready_list)
+                                       binaryheap *ready_heap)
 {
        int                     i;
 
@@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
                 * the current restore pass, and it is currently a member of the
                 * pending list (that check is needed to prevent double restore in
                 * some cases where a list-file forces out-of-order restoring).
-                * However, if ready_list == NULL then caller doesn't want any list
+                * However, if ready_heap == NULL then caller doesn't want any list
                 * memberships changed.
                 */
                if (otherte->depCount == 0 &&
                        _tocEntryRestorePass(otherte) == AH->restorePass &&
                        otherte->pending_prev != NULL &&
-                       ready_list != NULL)
+                       ready_heap != NULL)
                {
                        /* Remove it from pending list ... */
                        pending_list_remove(otherte);
-                       /* ... and add to ready_list */
-                       ready_list_insert(ready_list, otherte);
+                       /* ... and add to ready_heap */
+                       binaryheap_add(ready_heap, otherte);
                }
        }
 }