Assert that buffers are marked dirty before XLogRegisterBuffer().
authorJeff Davis <jdavis@postgresql.org>
Tue, 24 Oct 2023 00:17:46 +0000 (17:17 -0700)
committerJeff Davis <jdavis@postgresql.org>
Tue, 24 Oct 2023 00:17:46 +0000 (17:17 -0700)
Enforce the rule from transam/README in XLogRegisterBuffer(), and
update callers to follow the rule.

Hash indexes sometimes register clean pages as a part of the locking
protocol, so provide a REGBUF_NO_CHANGE flag to support that use.

Discussion: https://postgr.es/m/c84114f8-c7f1-5b57-f85a-3adc31e1a904@iki.fi
Reviewed-by: Heikki Linnakangas
src/backend/access/gin/ginbtree.c
src/backend/access/gin/gindatapage.c
src/backend/access/gin/ginentrypage.c
src/backend/access/gin/ginfast.c
src/backend/access/hash/hash.c
src/backend/access/hash/hashovfl.c
src/backend/access/transam/xloginsert.c
src/backend/storage/buffer/bufmgr.c
src/include/access/xloginsert.h
src/include/storage/bufmgr.h

index 06e97a7fbb88f9e22918b96f5a48d09be33fdba5..fc694b40f142116544e80dab8d7564c8c1e4d508 100644 (file)
@@ -387,24 +387,22 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
                START_CRIT_SECTION();
 
                if (RelationNeedsWAL(btree->index) && !btree->isBuild)
-               {
                        XLogBeginInsert();
-                       XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
-                       if (BufferIsValid(childbuf))
-                               XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
-               }
 
-               /* Perform the page update, and register any extra WAL data */
+               /*
+                * Perform the page update, dirty and register stack->buffer, and
+                * register any extra WAL data.
+                */
                btree->execPlaceToPage(btree, stack->buffer, stack,
                                                           insertdata, updateblkno, ptp_workspace);
 
-               MarkBufferDirty(stack->buffer);
-
                /* An insert to an internal page finishes the split of the child. */
                if (BufferIsValid(childbuf))
                {
                        GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
                        MarkBufferDirty(childbuf);
+                       if (RelationNeedsWAL(btree->index) && !btree->isBuild)
+                               XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
                }
 
                if (RelationNeedsWAL(btree->index) && !btree->isBuild)
index 344e1c5e6bd9e581bcab282dec529e272c0bc86e..249427308522e4ef4876a10133437de8a844b8b5 100644 (file)
@@ -721,9 +721,12 @@ dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
        /* Apply changes to page */
        dataPlaceToPageLeafRecompress(buf, leaf);
 
+       MarkBufferDirty(buf);
+
        /* If needed, register WAL data built by computeLeafRecompressWALData */
        if (RelationNeedsWAL(btree->index) && !btree->isBuild)
        {
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
                XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
        }
 }
@@ -1155,6 +1158,8 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
        pitem = (PostingItem *) insertdata;
        GinDataPageAddPostingItem(page, pitem, off);
 
+       MarkBufferDirty(buf);
+
        if (RelationNeedsWAL(btree->index) && !btree->isBuild)
        {
                /*
@@ -1167,6 +1172,7 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
                data.offset = off;
                data.newitem = *pitem;
 
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
                XLogRegisterBufData(0, (char *) &data,
                                                        sizeof(ginxlogInsertDataInternal));
        }
index 5a8c0eb98d0ebaa6c4eddb68d24e0b9721591edc..379496735ff45e26de0e89cc7ec691ef34ed2670 100644 (file)
@@ -571,6 +571,8 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
                elog(ERROR, "failed to add item to index page in \"%s\"",
                         RelationGetRelationName(btree->index));
 
+       MarkBufferDirty(buf);
+
        if (RelationNeedsWAL(btree->index) && !btree->isBuild)
        {
                /*
@@ -583,6 +585,7 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
                data.isDelete = insertData->isDelete;
                data.offset = off;
 
+               XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
                XLogRegisterBufData(0, (char *) &data,
                                                        offsetof(ginxlogInsertEntry, tuple));
                XLogRegisterBufData(0, (char *) insertData->entry,
index eb6c55483181d1f3ab236b2780ce7a1a4b9a3c36..c8fe7c78a7ac61437ea6bf68316f035bd0064f49 100644 (file)
@@ -397,6 +397,9 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
                }
 
                Assert((ptr - collectordata) <= collector->sumsize);
+
+               MarkBufferDirty(buffer);
+
                if (needWal)
                {
                        XLogRegisterBuffer(1, buffer, REGBUF_STANDARD);
@@ -404,8 +407,6 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
                }
 
                metadata->tailFreeSize = PageGetExactFreeSpace(page);
-
-               MarkBufferDirty(buffer);
        }
 
        /*
index fc5d97f606e2f34b5d155c471b400cec3fdd5cab..7a025f33cfeab51d7971af7b6a4cd05d879dda7d 100644 (file)
@@ -824,11 +824,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
                                XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
 
                                /*
-                                * bucket buffer needs to be registered to ensure that we can
-                                * acquire a cleanup lock on it during replay.
+                                * bucket buffer was not changed, but still needs to be
+                                * registered to ensure that we can acquire a cleanup lock on
+                                * it during replay.
                                 */
                                if (!xlrec.is_primary_bucket_page)
-                                       XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+                               {
+                                       uint8           flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
+
+                                       XLogRegisterBuffer(0, bucket_buf, flags);
+                               }
 
                                XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
                                XLogRegisterBufData(1, (char *) deletable,
index 39bb2cb9f61e4aae70bb7a25f40790f8cd184e5c..9d1ff20b922a7e6014b19f0fa608acf034078476 100644 (file)
@@ -658,11 +658,15 @@ _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
                XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
 
                /*
-                * bucket buffer needs to be registered to ensure that we can acquire
-                * a cleanup lock on it during replay.
+                * bucket buffer was not changed, but still needs to be registered to
+                * ensure that we can acquire a cleanup lock on it during replay.
                 */
                if (!xlrec.is_prim_bucket_same_wrt)
-                       XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+               {
+                       uint8           flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
+
+                       XLogRegisterBuffer(0, bucketbuf, flags);
+               }
 
                XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
                if (xlrec.ntups > 0)
@@ -960,11 +964,16 @@ readpage:
                                                XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
 
                                                /*
-                                                * bucket buffer needs to be registered to ensure that
-                                                * we can acquire a cleanup lock on it during replay.
+                                                * bucket buffer was not changed, but still needs to
+                                                * be registered to ensure that we can acquire a
+                                                * cleanup lock on it during replay.
                                                 */
                                                if (!xlrec.is_prim_bucket_same_wrt)
-                                                       XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
+                                               {
+                                                       int                     flags = REGBUF_STANDARD | REGBUF_NO_IMAGE | REGBUF_NO_CHANGE;
+
+                                                       XLogRegisterBuffer(0, bucket_buf, flags);
+                                               }
 
                                                XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
                                                XLogRegisterBufData(1, (char *) itup_offsets,
index 588626424e6452cfa5a9a0e6d0a46dec29b0f769..e4aaa551a04bbe3e58ded21f2cc15fd39c3a82df 100644 (file)
@@ -248,6 +248,20 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
        Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
        Assert(begininsert_called);
 
+       /*
+        * Ordinarily, buffer should be exclusive-locked and marked dirty before
+        * we get here, otherwise we could end up violating one of the rules in
+        * access/transam/README.
+        *
+        * Some callers intentionally register a clean page and never update that
+        * page's LSN; in that case they can pass the flag REGBUF_NO_CHANGE to
+        * bypass these checks.
+        */
+#ifdef USE_ASSERT_CHECKING
+       if (!(flags & REGBUF_NO_CHANGE))
+               Assert(BufferIsExclusiveLocked(buffer) && BufferIsDirty(buffer));
+#endif
+
        if (block_id >= max_registered_block_id)
        {
                if (block_id >= max_registered_buffers)
@@ -1313,8 +1327,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
                START_CRIT_SECTION();
                for (i = 0; i < nbufs; i++)
                {
-                       XLogRegisterBuffer(i, bufpack[i], flags);
                        MarkBufferDirty(bufpack[i]);
+                       XLogRegisterBuffer(i, bufpack[i], flags);
                }
 
                recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
index 8b96759b84f46cc0eaf939e935fb1af8a57e68dc..21a29f9081a85bad005b2f77d2e107637c43bd5e 100644 (file)
@@ -2098,6 +2098,65 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
        return first_block;
 }
 
+/*
+ * BufferIsExclusiveLocked
+ *
+ *      Checks if buffer is exclusive-locked.
+ *
+ * Buffer must be pinned.
+ */
+bool
+BufferIsExclusiveLocked(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+       {
+               int                     bufid = -buffer - 1;
+
+               bufHdr = GetLocalBufferDescriptor(bufid);
+       }
+       else
+       {
+               bufHdr = GetBufferDescriptor(buffer - 1);
+       }
+
+       Assert(BufferIsPinned(buffer));
+       return LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
+                                                               LW_EXCLUSIVE);
+}
+
+/*
+ * BufferIsDirty
+ *
+ *             Checks if buffer is already dirty.
+ *
+ * Buffer must be pinned and exclusive-locked.  (Without an exclusive lock,
+ * the result may be stale before it's returned.)
+ */
+bool
+BufferIsDirty(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+       {
+               int                     bufid = -buffer - 1;
+
+               bufHdr = GetLocalBufferDescriptor(bufid);
+       }
+       else
+       {
+               bufHdr = GetBufferDescriptor(buffer - 1);
+       }
+
+       Assert(BufferIsPinned(buffer));
+       Assert(LWLockHeldByMeInMode(BufferDescriptorGetContentLock(bufHdr),
+                                                               LW_EXCLUSIVE));
+
+       return pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY;
+}
+
 /*
  * MarkBufferDirty
  *
index 31785dc578fc407d5a0dac63edc494b08486ca61..cace867497686cf0ad3333ed10e8ca33f8255588 100644 (file)
@@ -37,6 +37,7 @@
                                                                         * will be skipped) */
 #define REGBUF_KEEP_DATA       0x10    /* include data even if a full-page image
                                                                         * is taken */
+#define REGBUF_NO_CHANGE       0x20    /* intentionally register clean buffer */
 
 /* prototypes for public functions in xloginsert.c: */
 extern void XLogBeginInsert(void);
index d89021f9187ac1c91f3025abb15c75ca01207b95..9241f8686110ec0eab132c1402c8e061b0ac4725 100644 (file)
@@ -179,6 +179,8 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
                                                                                bool permanent);
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
+extern bool BufferIsExclusiveLocked(Buffer buffer);
+extern bool BufferIsDirty(Buffer buffer);
 extern void MarkBufferDirty(Buffer buffer);
 extern void IncrBufferRefCount(Buffer buffer);
 extern void CheckBufferIsPinnedOnce(Buffer buffer);