Process sync requests incrementally in AbsorbSyncRequests master github/master
authorAlexander Korotkov <akorotkov@postgresql.org>
Sun, 27 Jul 2025 12:07:47 +0000 (15:07 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Sun, 27 Jul 2025 12:07:47 +0000 (15:07 +0300)
If the number of sync requests is big enough, the palloc() call in
AbsorbSyncRequests() will attempt to allocate more than 1 GB of memory,
resulting in failure.  This can lead to an infinite loop in the checkpointer
process, as it repeatedly fails to absorb the pending requests.

This commit introduces the following changes to cope with this problem:
 1. Turn pending checkpointer requests array in shared memory into a bounded
    ring buffer.
 2. Limit maximum ring buffer size to 10M items.
 3. Make AbsorbSyncRequests() process requests incrementally in 10K batches.

Even #2 makes the whole queue size fit the maximum palloc() size of 1 GB.
of continuous lock holding.

This commit is for master only.  Simpler fix, which just limits a request
queue size to 10M, will be backpatched.

Reported-by: Ekaterina Sokolova <e.sokolova@postgrespro.ru>
Discussion: https://postgr.es/m/db4534f83a22a29ab5ee2566ad86ca92%40postgrespro.ru
Author: Maxim Orlov <orlovmg@gmail.com>
Co-authored-by: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Alexander Korotkov <aekorotkov@gmail.com>
src/backend/postmaster/checkpointer.c

index 2809e298a44fbee1a0035b75a8c1724843853859..8490148a47d52a625459569a74f51dcb8d4040ba 100644 (file)
@@ -130,6 +130,13 @@ typedef struct
 
    int         num_requests;   /* current # of requests */
    int         max_requests;   /* allocated array size */
+
+   int         head;           /* Index of the first request in the ring
+                                * buffer */
+   int         tail;           /* Index of the last request in the ring
+                                * buffer */
+
+   /* The ring buffer of pending checkpointer requests */
    CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -138,6 +145,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
 /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
 #define WRITES_PER_ABSORB      1000
 
+/* Maximum number of checkpointer requests to process in one batch */
+#define CKPT_REQ_BATCH_SIZE 10000
+
+/* Max number of requests the checkpointer request queue can hold */
+#define MAX_CHECKPOINT_REQUESTS 10000000
+
 /*
  * GUC parameters
  */
@@ -973,7 +986,8 @@ CheckpointerShmemInit(void)
         */
        MemSet(CheckpointerShmem, 0, size);
        SpinLockInit(&CheckpointerShmem->ckpt_lck);
-       CheckpointerShmem->max_requests = NBuffers;
+       CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS);
+       CheckpointerShmem->head = CheckpointerShmem->tail = 0;
        ConditionVariableInit(&CheckpointerShmem->start_cv);
        ConditionVariableInit(&CheckpointerShmem->done_cv);
    }
@@ -1201,6 +1215,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 {
    CheckpointerRequest *request;
    bool        too_full;
+   int         insert_pos;
 
    if (!IsUnderPostmaster)
        return false;           /* probably shouldn't even get here */
@@ -1224,10 +1239,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
    }
 
    /* OK, insert request */
-   request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
+   insert_pos = CheckpointerShmem->tail;
+   request = &CheckpointerShmem->requests[insert_pos];
    request->ftag = *ftag;
    request->type = type;
 
+   CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests;
+   CheckpointerShmem->num_requests++;
+
    /* If queue is more than half full, nudge the checkpointer to empty it */
    too_full = (CheckpointerShmem->num_requests >=
                CheckpointerShmem->max_requests / 2);
@@ -1269,12 +1288,16 @@ CompactCheckpointerRequestQueue(void)
    struct CheckpointerSlotMapping
    {
        CheckpointerRequest request;
-       int         slot;
+       int         ring_idx;
    };
 
-   int         n,
-               preserve_count;
+   int         n;
    int         num_skipped = 0;
+   int         head;
+   int         max_requests;
+   int         num_requests;
+   int         read_idx,
+               write_idx;
    HASHCTL     ctl;
    HTAB       *htab;
    bool       *skip_slot;
@@ -1286,8 +1309,13 @@ CompactCheckpointerRequestQueue(void)
    if (CritSectionCount > 0)
        return false;
 
+   max_requests = CheckpointerShmem->max_requests;
+   num_requests = CheckpointerShmem->num_requests;
+
    /* Initialize skip_slot array */
-   skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
+   skip_slot = palloc0(sizeof(bool) * max_requests);
+
+   head = CheckpointerShmem->head;
 
    /* Initialize temporary hash table */
    ctl.keysize = sizeof(CheckpointerRequest);
@@ -1311,7 +1339,8 @@ CompactCheckpointerRequestQueue(void)
     * away preceding entries that would end up being canceled anyhow), but
     * it's not clear that the extra complexity would buy us anything.
     */
-   for (n = 0; n < CheckpointerShmem->num_requests; n++)
+   read_idx = head;
+   for (n = 0; n < num_requests; n++)
    {
        CheckpointerRequest *request;
        struct CheckpointerSlotMapping *slotmap;
@@ -1324,16 +1353,19 @@ CompactCheckpointerRequestQueue(void)
         * CheckpointerShmemInit.  Note also that RelFileLocator had better
         * contain no pad bytes.
         */
-       request = &CheckpointerShmem->requests[n];
+       request = &CheckpointerShmem->requests[read_idx];
        slotmap = hash_search(htab, request, HASH_ENTER, &found);
        if (found)
        {
            /* Duplicate, so mark the previous occurrence as skippable */
-           skip_slot[slotmap->slot] = true;
+           skip_slot[slotmap->ring_idx] = true;
            num_skipped++;
        }
        /* Remember slot containing latest occurrence of this request value */
-       slotmap->slot = n;
+       slotmap->ring_idx = read_idx;
+
+       /* Move to the next request in the ring buffer */
+       read_idx = (read_idx + 1) % max_requests;
    }
 
    /* Done with the hash table. */
@@ -1347,17 +1379,34 @@ CompactCheckpointerRequestQueue(void)
    }
 
    /* We found some duplicates; remove them. */
-   preserve_count = 0;
-   for (n = 0; n < CheckpointerShmem->num_requests; n++)
+   read_idx = write_idx = head;
+   for (n = 0; n < num_requests; n++)
    {
-       if (skip_slot[n])
-           continue;
-       CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+       /* If this slot is NOT skipped, keep it */
+       if (!skip_slot[read_idx])
+       {
+           /* If the read and write positions are different, copy the request */
+           if (write_idx != read_idx)
+               CheckpointerShmem->requests[write_idx] =
+                   CheckpointerShmem->requests[read_idx];
+
+           /* Advance the write position */
+           write_idx = (write_idx + 1) % max_requests;
+       }
+
+       read_idx = (read_idx + 1) % max_requests;
    }
+
+   /*
+    * Update ring buffer state: head remains the same, tail moves, count
+    * decreases
+    */
+   CheckpointerShmem->tail = write_idx;
+   CheckpointerShmem->num_requests -= num_skipped;
+
    ereport(DEBUG1,
            (errmsg_internal("compacted fsync request queue from %d entries to %d entries",
-                            CheckpointerShmem->num_requests, preserve_count)));
-   CheckpointerShmem->num_requests = preserve_count;
+                            num_requests, CheckpointerShmem->num_requests)));
 
    /* Cleanup. */
    pfree(skip_slot);
@@ -1378,40 +1427,64 @@ AbsorbSyncRequests(void)
 {
    CheckpointerRequest *requests = NULL;
    CheckpointerRequest *request;
-   int         n;
+   int         n,
+               i;
+   bool        loop;
 
    if (!AmCheckpointerProcess())
        return;
 
-   LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
-   /*
-    * We try to avoid holding the lock for a long time by copying the request
-    * array, and processing the requests after releasing the lock.
-    *
-    * Once we have cleared the requests from shared memory, we have to PANIC
-    * if we then fail to absorb them (eg, because our hashtable runs out of
-    * memory).  This is because the system cannot run safely if we are unable
-    * to fsync what we have been told to fsync.  Fortunately, the hashtable
-    * is so small that the problem is quite unlikely to arise in practice.
-    */
-   n = CheckpointerShmem->num_requests;
-   if (n > 0)
+   do
    {
-       requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-       memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
-   }
+       LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+
+       /*---
+        * We try to avoid holding the lock for a long time by:
+        * 1. Copying the request array and processing the requests after
+        *    releasing the lock;
+        * 2. Processing not the whole queue, but only batches of
+        *    CKPT_REQ_BATCH_SIZE at once.
+        *
+        * Once we have cleared the requests from shared memory, we must
+        * PANIC if we then fail to absorb them (e.g., because our hashtable
+        * runs out of memory).  This is because the system cannot run safely
+        * if we are unable to fsync what we have been told to fsync.
+        * Fortunately, the hashtable is so small that the problem is quite
+        * unlikely to arise in practice.
+        *
+        * Note: The maximum possible size of a ring buffer is
+        * MAX_CHECKPOINT_REQUESTS entries, which fit into a maximum palloc
+        * allocation size of 1Gb.  Our maximum batch size,
+        * CKPT_REQ_BATCH_SIZE, is even smaller.
+        */
+       n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE);
+       if (n > 0)
+       {
+           if (!requests)
+               requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
 
-   START_CRIT_SECTION();
+           for (i = 0; i < n; i++)
+           {
+               requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head];
+               CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests;
+           }
 
-   CheckpointerShmem->num_requests = 0;
+           CheckpointerShmem->num_requests -= n;
 
-   LWLockRelease(CheckpointerCommLock);
+       }
+
+       START_CRIT_SECTION();
+
+       /* Are there any requests in the queue? If so, keep going. */
+       loop = CheckpointerShmem->num_requests != 0;
+
+       LWLockRelease(CheckpointerCommLock);
 
-   for (request = requests; n > 0; request++, n--)
-       RememberSyncRequest(&request->ftag, request->type);
+       for (request = requests; n > 0; request++, n--)
+           RememberSyncRequest(&request->ftag, request->type);
 
-   END_CRIT_SECTION();
+       END_CRIT_SECTION();
+   } while (loop);
 
    if (requests)
        pfree(requests);