Move logic related to WAL replay of Heap/Heap2 into its own file
authorMichael Paquier <michael@paquier.xyz>
Thu, 12 Sep 2024 04:32:05 +0000 (13:32 +0900)
committerMichael Paquier <michael@paquier.xyz>
Thu, 12 Sep 2024 04:32:05 +0000 (13:32 +0900)
This brings more clarity to heapam.c, by cleanly separating all the
logic related to WAL replay and the rest of Heap and Heap2, similarly
to other RMGRs like hash, btree, etc.

The header reorganization is also nice in heapam.c, cutting half of the
headers required.

Author: Li Yong
Reviewed-by: Sutou Kouhei, Michael Paquier
Discussion: https://postgr.es/m/EFE55E65-D7BD-4C6A-B630-91F43FD0771B@ebay.com

src/backend/access/heap/Makefile
src/backend/access/heap/heapam.c
src/backend/access/heap/heapam_xlog.c [new file with mode: 0644]
src/backend/access/heap/meson.build
src/include/access/heapam.h

index af0bd1888e53ffd6d250c47c53c289f07fbb0b8c..394534172fa1a2313ac4b86f7600f2b277d93b05 100644 (file)
@@ -16,6 +16,7 @@ OBJS = \
        heapam.o \
        heapam_handler.o \
        heapam_visibility.o \
+       heapam_xlog.o \
        heaptoast.o \
        hio.o \
        pruneheap.o \
index 91b20147a004707dc4927c96c40713b6c54f6f59..f16710725767ce3af65e57684b8d7fe737392802 100644 (file)
  */
 #include "postgres.h"
 
-#include "access/bufmask.h"
 #include "access/heapam.h"
-#include "access/heapam_xlog.h"
 #include "access/heaptoast.h"
 #include "access/hio.h"
 #include "access/multixact.h"
-#include "access/parallel.h"
-#include "access/relscan.h"
 #include "access/subtrans.h"
 #include "access/syncscan.h"
-#include "access/sysattr.h"
-#include "access/tableam.h"
-#include "access/transam.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
-#include "access/xact.h"
-#include "access/xlog.h"
 #include "access/xloginsert.h"
-#include "access/xlogutils.h"
-#include "catalog/catalog.h"
 #include "commands/vacuum.h"
-#include "miscadmin.h"
 #include "pgstat.h"
-#include "port/atomics.h"
 #include "port/pg_bitutils.h"
-#include "storage/bufmgr.h"
-#include "storage/freespace.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
-#include "storage/standby.h"
 #include "utils/datum.h"
 #include "utils/injection_point.h"
 #include "utils/inval.h"
-#include "utils/relcache.h"
-#include "utils/snapmgr.h"
 #include "utils/spccache.h"
 
 
@@ -6811,30 +6793,6 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple,
        return freeze_xmin || replace_xvac || replace_xmax || freeze_xmax;
 }
 
-/*
- * heap_execute_freeze_tuple
- *             Execute the prepared freezing of a tuple with caller's freeze plan.
- *
- * Caller is responsible for ensuring that no other backend can access the
- * storage underlying this tuple, either by holding an exclusive lock on the
- * buffer containing it (which is what lazy VACUUM does), or by having it be
- * in private storage (which is what CLUSTER and friends do).
- */
-static inline void
-heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
-{
-       HeapTupleHeaderSetXmax(tuple, frz->xmax);
-
-       if (frz->frzflags & XLH_FREEZE_XVAC)
-               HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
-
-       if (frz->frzflags & XLH_INVALID_XVAC)
-               HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
-
-       tuple->t_infomask = frz->t_infomask;
-       tuple->t_infomask2 = frz->t_infomask2;
-}
-
 /*
  * Perform xmin/xmax XID status sanity checks before actually executing freeze
  * plans.
@@ -8745,1303 +8703,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
        return key_tuple;
 }
 
-/*
- * Replay XLOG_HEAP2_PRUNE_* records.
- */
-static void
-heap_xlog_prune_freeze(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       char       *maindataptr = XLogRecGetData(record);
-       xl_heap_prune xlrec;
-       Buffer          buffer;
-       RelFileLocator rlocator;
-       BlockNumber blkno;
-       XLogRedoAction action;
-
-       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-       memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
-       maindataptr += SizeOfHeapPrune;
-
-       /*
-        * We will take an ordinary exclusive lock or a cleanup lock depending on
-        * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
-        * lock, we better not be doing anything that requires moving existing
-        * tuple data.
-        */
-       Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
-                  (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
-
-       /*
-        * We are about to remove and/or freeze tuples.  In Hot Standby mode,
-        * ensure that there are no queries running for which the removed tuples
-        * are still visible or which still consider the frozen xids as running.
-        * The conflict horizon XID comes after xl_heap_prune.
-        */
-       if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
-       {
-               TransactionId snapshot_conflict_horizon;
-
-               /* memcpy() because snapshot_conflict_horizon is stored unaligned */
-               memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
-               maindataptr += sizeof(TransactionId);
-
-               if (InHotStandby)
-                       ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
-                                                                                               (xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
-                                                                                               rlocator);
-       }
-
-       /*
-        * If we have a full-page image, restore it and we're done.
-        */
-       action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
-                                                                                  (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
-                                                                                  &buffer);
-       if (action == BLK_NEEDS_REDO)
-       {
-               Page            page = (Page) BufferGetPage(buffer);
-               OffsetNumber *redirected;
-               OffsetNumber *nowdead;
-               OffsetNumber *nowunused;
-               int                     nredirected;
-               int                     ndead;
-               int                     nunused;
-               int                     nplans;
-               Size            datalen;
-               xlhp_freeze_plan *plans;
-               OffsetNumber *frz_offsets;
-               char       *dataptr = XLogRecGetBlockData(record, 0, &datalen);
-
-               heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
-                                                                                          &nplans, &plans, &frz_offsets,
-                                                                                          &nredirected, &redirected,
-                                                                                          &ndead, &nowdead,
-                                                                                          &nunused, &nowunused);
-
-               /*
-                * Update all line pointers per the record, and repair fragmentation
-                * if needed.
-                */
-               if (nredirected > 0 || ndead > 0 || nunused > 0)
-                       heap_page_prune_execute(buffer,
-                                                                       (xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
-                                                                       redirected, nredirected,
-                                                                       nowdead, ndead,
-                                                                       nowunused, nunused);
-
-               /* Freeze tuples */
-               for (int p = 0; p < nplans; p++)
-               {
-                       HeapTupleFreeze frz;
-
-                       /*
-                        * Convert freeze plan representation from WAL record into
-                        * per-tuple format used by heap_execute_freeze_tuple
-                        */
-                       frz.xmax = plans[p].xmax;
-                       frz.t_infomask2 = plans[p].t_infomask2;
-                       frz.t_infomask = plans[p].t_infomask;
-                       frz.frzflags = plans[p].frzflags;
-                       frz.offset = InvalidOffsetNumber;       /* unused, but be tidy */
-
-                       for (int i = 0; i < plans[p].ntuples; i++)
-                       {
-                               OffsetNumber offset = *(frz_offsets++);
-                               ItemId          lp;
-                               HeapTupleHeader tuple;
-
-                               lp = PageGetItemId(page, offset);
-                               tuple = (HeapTupleHeader) PageGetItem(page, lp);
-                               heap_execute_freeze_tuple(tuple, &frz);
-                       }
-               }
-
-               /* There should be no more data */
-               Assert((char *) frz_offsets == dataptr + datalen);
-
-               /*
-                * Note: we don't worry about updating the page's prunability hints.
-                * At worst this will cause an extra prune cycle to occur soon.
-                */
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-
-       /*
-        * If we released any space or line pointers, update the free space map.
-        *
-        * Do this regardless of a full-page image being applied, since the FSM
-        * data is not in the page anyway.
-        */
-       if (BufferIsValid(buffer))
-       {
-               if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
-                                                  XLHP_HAS_DEAD_ITEMS |
-                                                  XLHP_HAS_NOW_UNUSED_ITEMS))
-               {
-                       Size            freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-
-                       UnlockReleaseBuffer(buffer);
-
-                       XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-               }
-               else
-                       UnlockReleaseBuffer(buffer);
-       }
-}
-
-/*
- * Replay XLOG_HEAP2_VISIBLE record.
- *
- * The critical integrity requirement here is that we must never end up with
- * a situation where the visibility map bit is set, and the page-level
- * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
- * page modification would fail to clear the visibility map bit.
- */
-static void
-heap_xlog_visible(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
-       Buffer          vmbuffer = InvalidBuffer;
-       Buffer          buffer;
-       Page            page;
-       RelFileLocator rlocator;
-       BlockNumber blkno;
-       XLogRedoAction action;
-
-       Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
-
-       XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
-
-       /*
-        * If there are any Hot Standby transactions running that have an xmin
-        * horizon old enough that this page isn't all-visible for them, they
-        * might incorrectly decide that an index-only scan can skip a heap fetch.
-        *
-        * NB: It might be better to throw some kind of "soft" conflict here that
-        * forces any index-only scan that is in flight to perform heap fetches,
-        * rather than killing the transaction outright.
-        */
-       if (InHotStandby)
-               ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
-                                                                                       xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
-                                                                                       rlocator);
-
-       /*
-        * Read the heap page, if it still exists. If the heap file has dropped or
-        * truncated later in recovery, we don't need to update the page, but we'd
-        * better still update the visibility map.
-        */
-       action = XLogReadBufferForRedo(record, 1, &buffer);
-       if (action == BLK_NEEDS_REDO)
-       {
-               /*
-                * We don't bump the LSN of the heap page when setting the visibility
-                * map bit (unless checksums or wal_hint_bits is enabled, in which
-                * case we must). This exposes us to torn page hazards, but since
-                * we're not inspecting the existing page contents in any way, we
-                * don't care.
-                */
-               page = BufferGetPage(buffer);
-
-               PageSetAllVisible(page);
-
-               if (XLogHintBitIsNeeded())
-                       PageSetLSN(page, lsn);
-
-               MarkBufferDirty(buffer);
-       }
-       else if (action == BLK_RESTORED)
-       {
-               /*
-                * If heap block was backed up, we already restored it and there's
-                * nothing more to do. (This can only happen with checksums or
-                * wal_log_hints enabled.)
-                */
-       }
-
-       if (BufferIsValid(buffer))
-       {
-               Size            space = PageGetFreeSpace(BufferGetPage(buffer));
-
-               UnlockReleaseBuffer(buffer);
-
-               /*
-                * Since FSM is not WAL-logged and only updated heuristically, it
-                * easily becomes stale in standbys.  If the standby is later promoted
-                * and runs VACUUM, it will skip updating individual free space
-                * figures for pages that became all-visible (or all-frozen, depending
-                * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
-                * propagates too optimistic free space values to upper FSM layers;
-                * later inserters try to use such pages only to find out that they
-                * are unusable.  This can cause long stalls when there are many such
-                * pages.
-                *
-                * Forestall those problems by updating FSM's idea about a page that
-                * is becoming all-visible or all-frozen.
-                *
-                * Do this regardless of a full-page image being applied, since the
-                * FSM data is not in the page anyway.
-                */
-               if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
-                       XLogRecordPageWithFreeSpace(rlocator, blkno, space);
-       }
-
-       /*
-        * Even if we skipped the heap page update due to the LSN interlock, it's
-        * still safe to update the visibility map.  Any WAL record that clears
-        * the visibility map bit does so before checking the page LSN, so any
-        * bits that need to be cleared will still be cleared.
-        */
-       if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
-                                                                         &vmbuffer) == BLK_NEEDS_REDO)
-       {
-               Page            vmpage = BufferGetPage(vmbuffer);
-               Relation        reln;
-               uint8           vmbits;
-
-               /* initialize the page if it was read as zeros */
-               if (PageIsNew(vmpage))
-                       PageInit(vmpage, BLCKSZ, 0);
-
-               /* remove VISIBILITYMAP_XLOG_* */
-               vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
-
-               /*
-                * XLogReadBufferForRedoExtended locked the buffer. But
-                * visibilitymap_set will handle locking itself.
-                */
-               LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
-
-               reln = CreateFakeRelcacheEntry(rlocator);
-               visibilitymap_pin(reln, blkno, &vmbuffer);
-
-               visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
-                                                 xlrec->snapshotConflictHorizon, vmbits);
-
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-       else if (BufferIsValid(vmbuffer))
-               UnlockReleaseBuffer(vmbuffer);
-}
-
-/*
- * Given an "infobits" field from an XLog record, set the correct bits in the
- * given infomask and infomask2 for the tuple touched by the record.
- *
- * (This is the reverse of compute_infobits).
- */
-static void
-fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
-{
-       *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
-                                  HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
-       *infomask2 &= ~HEAP_KEYS_UPDATED;
-
-       if (infobits & XLHL_XMAX_IS_MULTI)
-               *infomask |= HEAP_XMAX_IS_MULTI;
-       if (infobits & XLHL_XMAX_LOCK_ONLY)
-               *infomask |= HEAP_XMAX_LOCK_ONLY;
-       if (infobits & XLHL_XMAX_EXCL_LOCK)
-               *infomask |= HEAP_XMAX_EXCL_LOCK;
-       /* note HEAP_XMAX_SHR_LOCK isn't considered here */
-       if (infobits & XLHL_XMAX_KEYSHR_LOCK)
-               *infomask |= HEAP_XMAX_KEYSHR_LOCK;
-
-       if (infobits & XLHL_KEYS_UPDATED)
-               *infomask2 |= HEAP_KEYS_UPDATED;
-}
-
-static void
-heap_xlog_delete(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-       BlockNumber blkno;
-       RelFileLocator target_locator;
-       ItemPointerData target_tid;
-
-       XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-       ItemPointerSetBlockNumber(&target_tid, blkno);
-       ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-       {
-               Relation        reln = CreateFakeRelcacheEntry(target_locator);
-               Buffer          vmbuffer = InvalidBuffer;
-
-               visibilitymap_pin(reln, blkno, &vmbuffer);
-               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-       {
-               page = BufferGetPage(buffer);
-
-               if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
-                       lp = PageGetItemId(page, xlrec->offnum);
-
-               if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-               HeapTupleHeaderClearHotUpdated(htup);
-               fix_infomask_from_infobits(xlrec->infobits_set,
-                                                                  &htup->t_infomask, &htup->t_infomask2);
-               if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
-                       HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-               else
-                       HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
-               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-
-               /* Mark the page as a candidate for pruning */
-               PageSetPrunable(page, XLogRecGetXid(record));
-
-               if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
-                       PageClearAllVisible(page);
-
-               /* Make sure t_ctid is set correctly */
-               if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
-                       HeapTupleHeaderSetMovedPartitions(htup);
-               else
-                       htup->t_ctid = target_tid;
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_insert(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
-       union
-       {
-               HeapTupleHeaderData hdr;
-               char            data[MaxHeapTupleSize];
-       }                       tbuf;
-       HeapTupleHeader htup;
-       xl_heap_header xlhdr;
-       uint32          newlen;
-       Size            freespace = 0;
-       RelFileLocator target_locator;
-       BlockNumber blkno;
-       ItemPointerData target_tid;
-       XLogRedoAction action;
-
-       XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
-       ItemPointerSetBlockNumber(&target_tid, blkno);
-       ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-       {
-               Relation        reln = CreateFakeRelcacheEntry(target_locator);
-               Buffer          vmbuffer = InvalidBuffer;
-
-               visibilitymap_pin(reln, blkno, &vmbuffer);
-               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       /*
-        * If we inserted the first and only tuple on the page, re-initialize the
-        * page from scratch.
-        */
-       if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-       {
-               buffer = XLogInitBufferForRedo(record, 0);
-               page = BufferGetPage(buffer);
-               PageInit(page, BufferGetPageSize(buffer), 0);
-               action = BLK_NEEDS_REDO;
-       }
-       else
-               action = XLogReadBufferForRedo(record, 0, &buffer);
-       if (action == BLK_NEEDS_REDO)
-       {
-               Size            datalen;
-               char       *data;
-
-               page = BufferGetPage(buffer);
-
-               if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
-                       elog(PANIC, "invalid max offset number");
-
-               data = XLogRecGetBlockData(record, 0, &datalen);
-
-               newlen = datalen - SizeOfHeapHeader;
-               Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
-               memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
-               data += SizeOfHeapHeader;
-
-               htup = &tbuf.hdr;
-               MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-               /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-               memcpy((char *) htup + SizeofHeapTupleHeader,
-                          data,
-                          newlen);
-               newlen += SizeofHeapTupleHeader;
-               htup->t_infomask2 = xlhdr.t_infomask2;
-               htup->t_infomask = xlhdr.t_infomask;
-               htup->t_hoff = xlhdr.t_hoff;
-               HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-               HeapTupleHeaderSetCmin(htup, FirstCommandId);
-               htup->t_ctid = target_tid;
-
-               if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
-                                               true, true) == InvalidOffsetNumber)
-                       elog(PANIC, "failed to add tuple");
-
-               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-               PageSetLSN(page, lsn);
-
-               if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-                       PageClearAllVisible(page);
-
-               /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-               if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-                       PageSetAllVisible(page);
-
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-
-       /*
-        * If the page is running low on free space, update the FSM as well.
-        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-        * better than that without knowing the fill-factor for the table.
-        *
-        * XXX: Don't do this if the page was restored from full page image. We
-        * don't bother to update the FSM in that case, it doesn't need to be
-        * totally accurate anyway.
-        */
-       if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-               XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
-}
-
-/*
- * Handles MULTI_INSERT record type.
- */
-static void
-heap_xlog_multi_insert(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_multi_insert *xlrec;
-       RelFileLocator rlocator;
-       BlockNumber blkno;
-       Buffer          buffer;
-       Page            page;
-       union
-       {
-               HeapTupleHeaderData hdr;
-               char            data[MaxHeapTupleSize];
-       }                       tbuf;
-       HeapTupleHeader htup;
-       uint32          newlen;
-       Size            freespace = 0;
-       int                     i;
-       bool            isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
-       XLogRedoAction action;
-
-       /*
-        * Insertion doesn't overwrite MVCC data, so no conflict processing is
-        * required.
-        */
-       xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
-
-       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-
-       /* check that the mutually exclusive flags are not both set */
-       Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
-                        (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-       {
-               Relation        reln = CreateFakeRelcacheEntry(rlocator);
-               Buffer          vmbuffer = InvalidBuffer;
-
-               visibilitymap_pin(reln, blkno, &vmbuffer);
-               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       if (isinit)
-       {
-               buffer = XLogInitBufferForRedo(record, 0);
-               page = BufferGetPage(buffer);
-               PageInit(page, BufferGetPageSize(buffer), 0);
-               action = BLK_NEEDS_REDO;
-       }
-       else
-               action = XLogReadBufferForRedo(record, 0, &buffer);
-       if (action == BLK_NEEDS_REDO)
-       {
-               char       *tupdata;
-               char       *endptr;
-               Size            len;
-
-               /* Tuples are stored as block data */
-               tupdata = XLogRecGetBlockData(record, 0, &len);
-               endptr = tupdata + len;
-
-               page = (Page) BufferGetPage(buffer);
-
-               for (i = 0; i < xlrec->ntuples; i++)
-               {
-                       OffsetNumber offnum;
-                       xl_multi_insert_tuple *xlhdr;
-
-                       /*
-                        * If we're reinitializing the page, the tuples are stored in
-                        * order from FirstOffsetNumber. Otherwise there's an array of
-                        * offsets in the WAL record, and the tuples come after that.
-                        */
-                       if (isinit)
-                               offnum = FirstOffsetNumber + i;
-                       else
-                               offnum = xlrec->offsets[i];
-                       if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-                               elog(PANIC, "invalid max offset number");
-
-                       xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
-                       tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
-
-                       newlen = xlhdr->datalen;
-                       Assert(newlen <= MaxHeapTupleSize);
-                       htup = &tbuf.hdr;
-                       MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-                       /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-                       memcpy((char *) htup + SizeofHeapTupleHeader,
-                                  (char *) tupdata,
-                                  newlen);
-                       tupdata += newlen;
-
-                       newlen += SizeofHeapTupleHeader;
-                       htup->t_infomask2 = xlhdr->t_infomask2;
-                       htup->t_infomask = xlhdr->t_infomask;
-                       htup->t_hoff = xlhdr->t_hoff;
-                       HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-                       HeapTupleHeaderSetCmin(htup, FirstCommandId);
-                       ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
-                       ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
-
-                       offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-                       if (offnum == InvalidOffsetNumber)
-                               elog(PANIC, "failed to add tuple");
-               }
-               if (tupdata != endptr)
-                       elog(PANIC, "total tuple length mismatch");
-
-               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-               PageSetLSN(page, lsn);
-
-               if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
-                       PageClearAllVisible(page);
-
-               /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
-               if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
-                       PageSetAllVisible(page);
-
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-
-       /*
-        * If the page is running low on free space, update the FSM as well.
-        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-        * better than that without knowing the fill-factor for the table.
-        *
-        * XXX: Don't do this if the page was restored from full page image. We
-        * don't bother to update the FSM in that case, it doesn't need to be
-        * totally accurate anyway.
-        */
-       if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
-               XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-}
-
-/*
- * Handles UPDATE and HOT_UPDATE
- */
-static void
-heap_xlog_update(XLogReaderState *record, bool hot_update)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-       RelFileLocator rlocator;
-       BlockNumber oldblk;
-       BlockNumber newblk;
-       ItemPointerData newtid;
-       Buffer          obuffer,
-                               nbuffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleData oldtup;
-       HeapTupleHeader htup;
-       uint16          prefixlen = 0,
-                               suffixlen = 0;
-       char       *newp;
-       union
-       {
-               HeapTupleHeaderData hdr;
-               char            data[MaxHeapTupleSize];
-       }                       tbuf;
-       xl_heap_header xlhdr;
-       uint32          newlen;
-       Size            freespace = 0;
-       XLogRedoAction oldaction;
-       XLogRedoAction newaction;
-
-       /* initialize to keep the compiler quiet */
-       oldtup.t_data = NULL;
-       oldtup.t_len = 0;
-
-       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
-       if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
-       {
-               /* HOT updates are never done across pages */
-               Assert(!hot_update);
-       }
-       else
-               oldblk = newblk;
-
-       ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-       {
-               Relation        reln = CreateFakeRelcacheEntry(rlocator);
-               Buffer          vmbuffer = InvalidBuffer;
-
-               visibilitymap_pin(reln, oldblk, &vmbuffer);
-               visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       /*
-        * In normal operation, it is important to lock the two pages in
-        * page-number order, to avoid possible deadlocks against other update
-        * operations going the other way.  However, during WAL replay there can
-        * be no other update happening, so we don't need to worry about that. But
-        * we *do* need to worry that we don't expose an inconsistent state to Hot
-        * Standby queries --- so the original page can't be unlocked before we've
-        * added the new tuple to the new page.
-        */
-
-       /* Deal with old tuple version */
-       oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-                                                                         &obuffer);
-       if (oldaction == BLK_NEEDS_REDO)
-       {
-               page = BufferGetPage(obuffer);
-               offnum = xlrec->old_offnum;
-               if (PageGetMaxOffsetNumber(page) >= offnum)
-                       lp = PageGetItemId(page, offnum);
-
-               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               oldtup.t_data = htup;
-               oldtup.t_len = ItemIdGetLength(lp);
-
-               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-               if (hot_update)
-                       HeapTupleHeaderSetHotUpdated(htup);
-               else
-                       HeapTupleHeaderClearHotUpdated(htup);
-               fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
-                                                                  &htup->t_infomask2);
-               HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
-               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-               /* Set forward chain link in t_ctid */
-               htup->t_ctid = newtid;
-
-               /* Mark the page as a candidate for pruning */
-               PageSetPrunable(page, XLogRecGetXid(record));
-
-               if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
-                       PageClearAllVisible(page);
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(obuffer);
-       }
-
-       /*
-        * Read the page the new tuple goes into, if different from old.
-        */
-       if (oldblk == newblk)
-       {
-               nbuffer = obuffer;
-               newaction = oldaction;
-       }
-       else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
-       {
-               nbuffer = XLogInitBufferForRedo(record, 0);
-               page = (Page) BufferGetPage(nbuffer);
-               PageInit(page, BufferGetPageSize(nbuffer), 0);
-               newaction = BLK_NEEDS_REDO;
-       }
-       else
-               newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-       {
-               Relation        reln = CreateFakeRelcacheEntry(rlocator);
-               Buffer          vmbuffer = InvalidBuffer;
-
-               visibilitymap_pin(reln, newblk, &vmbuffer);
-               visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       /* Deal with new tuple */
-       if (newaction == BLK_NEEDS_REDO)
-       {
-               char       *recdata;
-               char       *recdata_end;
-               Size            datalen;
-               Size            tuplen;
-
-               recdata = XLogRecGetBlockData(record, 0, &datalen);
-               recdata_end = recdata + datalen;
-
-               page = BufferGetPage(nbuffer);
-
-               offnum = xlrec->new_offnum;
-               if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-                       elog(PANIC, "invalid max offset number");
-
-               if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
-               {
-                       Assert(newblk == oldblk);
-                       memcpy(&prefixlen, recdata, sizeof(uint16));
-                       recdata += sizeof(uint16);
-               }
-               if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
-               {
-                       Assert(newblk == oldblk);
-                       memcpy(&suffixlen, recdata, sizeof(uint16));
-                       recdata += sizeof(uint16);
-               }
-
-               memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
-               recdata += SizeOfHeapHeader;
-
-               tuplen = recdata_end - recdata;
-               Assert(tuplen <= MaxHeapTupleSize);
-
-               htup = &tbuf.hdr;
-               MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-
-               /*
-                * Reconstruct the new tuple using the prefix and/or suffix from the
-                * old tuple, and the data stored in the WAL record.
-                */
-               newp = (char *) htup + SizeofHeapTupleHeader;
-               if (prefixlen > 0)
-               {
-                       int                     len;
-
-                       /* copy bitmap [+ padding] [+ oid] from WAL record */
-                       len = xlhdr.t_hoff - SizeofHeapTupleHeader;
-                       memcpy(newp, recdata, len);
-                       recdata += len;
-                       newp += len;
-
-                       /* copy prefix from old tuple */
-                       memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
-                       newp += prefixlen;
-
-                       /* copy new tuple data from WAL record */
-                       len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
-                       memcpy(newp, recdata, len);
-                       recdata += len;
-                       newp += len;
-               }
-               else
-               {
-                       /*
-                        * copy bitmap [+ padding] [+ oid] + data from record, all in one
-                        * go
-                        */
-                       memcpy(newp, recdata, tuplen);
-                       recdata += tuplen;
-                       newp += tuplen;
-               }
-               Assert(recdata == recdata_end);
-
-               /* copy suffix from old tuple */
-               if (suffixlen > 0)
-                       memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
-
-               newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
-               htup->t_infomask2 = xlhdr.t_infomask2;
-               htup->t_infomask = xlhdr.t_infomask;
-               htup->t_hoff = xlhdr.t_hoff;
-
-               HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
-               HeapTupleHeaderSetCmin(htup, FirstCommandId);
-               HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
-               /* Make sure there is no forward chain link in t_ctid */
-               htup->t_ctid = newtid;
-
-               offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-               if (offnum == InvalidOffsetNumber)
-                       elog(PANIC, "failed to add tuple");
-
-               if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
-                       PageClearAllVisible(page);
-
-               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(nbuffer);
-       }
-
-       if (BufferIsValid(nbuffer) && nbuffer != obuffer)
-               UnlockReleaseBuffer(nbuffer);
-       if (BufferIsValid(obuffer))
-               UnlockReleaseBuffer(obuffer);
-
-       /*
-        * If the new page is running low on free space, update the FSM as well.
-        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
-        * better than that without knowing the fill-factor for the table.
-        *
-        * However, don't update the FSM on HOT updates, because after crash
-        * recovery, either the old or the new tuple will certainly be dead and
-        * prunable. After pruning, the page will have roughly as much free space
-        * as it did before the update, assuming the new tuple is about the same
-        * size as the old one.
-        *
-        * XXX: Don't do this if the page was restored from full page image. We
-        * don't bother to update the FSM in that case, it doesn't need to be
-        * totally accurate anyway.
-        */
-       if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
-               XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
-}
-
-static void
-heap_xlog_confirm(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-
-       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-       {
-               page = BufferGetPage(buffer);
-
-               offnum = xlrec->offnum;
-               if (PageGetMaxOffsetNumber(page) >= offnum)
-                       lp = PageGetItemId(page, offnum);
-
-               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               /*
-                * Confirm tuple as actually inserted
-                */
-               ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-       {
-               RelFileLocator rlocator;
-               Buffer          vmbuffer = InvalidBuffer;
-               BlockNumber block;
-               Relation        reln;
-
-               XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-               reln = CreateFakeRelcacheEntry(rlocator);
-
-               visibilitymap_pin(reln, block, &vmbuffer);
-               visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-       {
-               page = (Page) BufferGetPage(buffer);
-
-               offnum = xlrec->offnum;
-               if (PageGetMaxOffsetNumber(page) >= offnum)
-                       lp = PageGetItemId(page, offnum);
-
-               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-               fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-                                                                  &htup->t_infomask2);
-
-               /*
-                * Clear relevant update flags, but only if the modified infomask says
-                * there's no update.
-                */
-               if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
-               {
-                       HeapTupleHeaderClearHotUpdated(htup);
-                       /* Make sure there is no forward chain link in t_ctid */
-                       ItemPointerSet(&htup->t_ctid,
-                                                  BufferGetBlockNumber(buffer),
-                                                  offnum);
-               }
-               HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock_updated(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_lock_updated *xlrec;
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-
-       xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
-
-       /*
-        * The visibility map may need to be fixed even if the heap page is
-        * already up-to-date.
-        */
-       if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
-       {
-               RelFileLocator rlocator;
-               Buffer          vmbuffer = InvalidBuffer;
-               BlockNumber block;
-               Relation        reln;
-
-               XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
-               reln = CreateFakeRelcacheEntry(rlocator);
-
-               visibilitymap_pin(reln, block, &vmbuffer);
-               visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
-               ReleaseBuffer(vmbuffer);
-               FreeFakeRelcacheEntry(reln);
-       }
-
-       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-       {
-               page = BufferGetPage(buffer);
-
-               offnum = xlrec->offnum;
-               if (PageGetMaxOffsetNumber(page) >= offnum)
-                       lp = PageGetItemId(page, offnum);
-
-               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-               fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-                                                                  &htup->t_infomask2);
-               HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_inplace(XLogReaderState *record)
-{
-       XLogRecPtr      lsn = record->EndRecPtr;
-       xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
-       Buffer          buffer;
-       Page            page;
-       OffsetNumber offnum;
-       ItemId          lp = NULL;
-       HeapTupleHeader htup;
-       uint32          oldlen;
-       Size            newlen;
-
-       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
-       {
-               char       *newtup = XLogRecGetBlockData(record, 0, &newlen);
-
-               page = BufferGetPage(buffer);
-
-               offnum = xlrec->offnum;
-               if (PageGetMaxOffsetNumber(page) >= offnum)
-                       lp = PageGetItemId(page, offnum);
-
-               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-                       elog(PANIC, "invalid lp");
-
-               htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-               oldlen = ItemIdGetLength(lp) - htup->t_hoff;
-               if (oldlen != newlen)
-                       elog(PANIC, "wrong tuple length");
-
-               memcpy((char *) htup + htup->t_hoff, newtup, newlen);
-
-               PageSetLSN(page, lsn);
-               MarkBufferDirty(buffer);
-       }
-       if (BufferIsValid(buffer))
-               UnlockReleaseBuffer(buffer);
-}
-
-void
-heap_redo(XLogReaderState *record)
-{
-       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-       /*
-        * These operations don't overwrite MVCC data so no conflict processing is
-        * required. The ones in heap2 rmgr do.
-        */
-
-       switch (info & XLOG_HEAP_OPMASK)
-       {
-               case XLOG_HEAP_INSERT:
-                       heap_xlog_insert(record);
-                       break;
-               case XLOG_HEAP_DELETE:
-                       heap_xlog_delete(record);
-                       break;
-               case XLOG_HEAP_UPDATE:
-                       heap_xlog_update(record, false);
-                       break;
-               case XLOG_HEAP_TRUNCATE:
-
-                       /*
-                        * TRUNCATE is a no-op because the actions are already logged as
-                        * SMGR WAL records.  TRUNCATE WAL record only exists for logical
-                        * decoding.
-                        */
-                       break;
-               case XLOG_HEAP_HOT_UPDATE:
-                       heap_xlog_update(record, true);
-                       break;
-               case XLOG_HEAP_CONFIRM:
-                       heap_xlog_confirm(record);
-                       break;
-               case XLOG_HEAP_LOCK:
-                       heap_xlog_lock(record);
-                       break;
-               case XLOG_HEAP_INPLACE:
-                       heap_xlog_inplace(record);
-                       break;
-               default:
-                       elog(PANIC, "heap_redo: unknown op code %u", info);
-       }
-}
-
-void
-heap2_redo(XLogReaderState *record)
-{
-       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
-       switch (info & XLOG_HEAP_OPMASK)
-       {
-               case XLOG_HEAP2_PRUNE_ON_ACCESS:
-               case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
-               case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
-                       heap_xlog_prune_freeze(record);
-                       break;
-               case XLOG_HEAP2_VISIBLE:
-                       heap_xlog_visible(record);
-                       break;
-               case XLOG_HEAP2_MULTI_INSERT:
-                       heap_xlog_multi_insert(record);
-                       break;
-               case XLOG_HEAP2_LOCK_UPDATED:
-                       heap_xlog_lock_updated(record);
-                       break;
-               case XLOG_HEAP2_NEW_CID:
-
-                       /*
-                        * Nothing to do on a real replay, only used during logical
-                        * decoding.
-                        */
-                       break;
-               case XLOG_HEAP2_REWRITE:
-                       heap_xlog_logical_rewrite(record);
-                       break;
-               default:
-                       elog(PANIC, "heap2_redo: unknown op code %u", info);
-       }
-}
-
-/*
- * Mask a heap page before performing consistency checks on it.
- */
-void
-heap_mask(char *pagedata, BlockNumber blkno)
-{
-       Page            page = (Page) pagedata;
-       OffsetNumber off;
-
-       mask_page_lsn_and_checksum(page);
-
-       mask_page_hint_bits(page);
-       mask_unused_space(page);
-
-       for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
-       {
-               ItemId          iid = PageGetItemId(page, off);
-               char       *page_item;
-
-               page_item = (char *) (page + ItemIdGetOffset(iid));
-
-               if (ItemIdIsNormal(iid))
-               {
-                       HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
-
-                       /*
-                        * If xmin of a tuple is not yet frozen, we should ignore
-                        * differences in hint bits, since they can be set without
-                        * emitting WAL.
-                        */
-                       if (!HeapTupleHeaderXminFrozen(page_htup))
-                               page_htup->t_infomask &= ~HEAP_XACT_MASK;
-                       else
-                       {
-                               /* Still we need to mask xmax hint bits. */
-                               page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
-                               page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
-                       }
-
-                       /*
-                        * During replay, we set Command Id to FirstCommandId. Hence, mask
-                        * it. See heap_xlog_insert() for details.
-                        */
-                       page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
-
-                       /*
-                        * For a speculative tuple, heap_insert() does not set ctid in the
-                        * caller-passed heap tuple itself, leaving the ctid field to
-                        * contain a speculative token value - a per-backend monotonically
-                        * increasing identifier. Besides, it does not WAL-log ctid under
-                        * any circumstances.
-                        *
-                        * During redo, heap_xlog_insert() sets t_ctid to current block
-                        * number and self offset number. It doesn't care about any
-                        * speculative insertions on the primary. Hence, we set t_ctid to
-                        * current block number and self offset number to ignore any
-                        * inconsistency.
-                        */
-                       if (HeapTupleHeaderIsSpeculative(page_htup))
-                               ItemPointerSet(&page_htup->t_ctid, blkno, off);
-
-                       /*
-                        * NB: Not ignoring ctid changes due to the tuple having moved
-                        * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
-                        * important information that needs to be in-sync between primary
-                        * and standby, and thus is WAL logged.
-                        */
-               }
-
-               /*
-                * Ignore any padding bytes after the tuple, when the length of the
-                * item is not MAXALIGNed.
-                */
-               if (ItemIdHasStorage(iid))
-               {
-                       int                     len = ItemIdGetLength(iid);
-                       int                     padlen = MAXALIGN(len) - len;
-
-                       if (padlen > 0)
-                               memset(page_item + len, MASK_MARKER, padlen);
-               }
-       }
-}
-
 /*
  * HeapCheckForSerializableConflictOut
  *             We are reading a tuple.  If it's not visible, there may be a
diff --git a/src/backend/access/heap/heapam_xlog.c b/src/backend/access/heap/heapam_xlog.c
new file mode 100644 (file)
index 0000000..6dae723
--- /dev/null
@@ -0,0 +1,1339 @@
+/*-------------------------------------------------------------------------
+ *
+ * heapam_xlog.c
+ *       WAL replay logic for heap access method.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       src/backend/access/heap/heapam_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/heapam.h"
+#include "access/visibilitymap.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "storage/freespace.h"
+#include "storage/standby.h"
+
+
+/*
+ * Replay XLOG_HEAP2_PRUNE_* records.
+ */
+static void
+heap_xlog_prune_freeze(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       char       *maindataptr = XLogRecGetData(record);
+       xl_heap_prune xlrec;
+       Buffer          buffer;
+       RelFileLocator rlocator;
+       BlockNumber blkno;
+       XLogRedoAction action;
+
+       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+       memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
+       maindataptr += SizeOfHeapPrune;
+
+       /*
+        * We will take an ordinary exclusive lock or a cleanup lock depending on
+        * whether the XLHP_CLEANUP_LOCK flag is set.  With an ordinary exclusive
+        * lock, we better not be doing anything that requires moving existing
+        * tuple data.
+        */
+       Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
+                  (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
+
+       /*
+        * We are about to remove and/or freeze tuples.  In Hot Standby mode,
+        * ensure that there are no queries running for which the removed tuples
+        * are still visible or which still consider the frozen xids as running.
+        * The conflict horizon XID comes after xl_heap_prune.
+        */
+       if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
+       {
+               TransactionId snapshot_conflict_horizon;
+
+               /* memcpy() because snapshot_conflict_horizon is stored unaligned */
+               memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
+               maindataptr += sizeof(TransactionId);
+
+               if (InHotStandby)
+                       ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
+                                                                                               (xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
+                                                                                               rlocator);
+       }
+
+       /*
+        * If we have a full-page image, restore it and we're done.
+        */
+       action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+                                                                                  (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
+                                                                                  &buffer);
+       if (action == BLK_NEEDS_REDO)
+       {
+               Page            page = (Page) BufferGetPage(buffer);
+               OffsetNumber *redirected;
+               OffsetNumber *nowdead;
+               OffsetNumber *nowunused;
+               int                     nredirected;
+               int                     ndead;
+               int                     nunused;
+               int                     nplans;
+               Size            datalen;
+               xlhp_freeze_plan *plans;
+               OffsetNumber *frz_offsets;
+               char       *dataptr = XLogRecGetBlockData(record, 0, &datalen);
+
+               heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
+                                                                                          &nplans, &plans, &frz_offsets,
+                                                                                          &nredirected, &redirected,
+                                                                                          &ndead, &nowdead,
+                                                                                          &nunused, &nowunused);
+
+               /*
+                * Update all line pointers per the record, and repair fragmentation
+                * if needed.
+                */
+               if (nredirected > 0 || ndead > 0 || nunused > 0)
+                       heap_page_prune_execute(buffer,
+                                                                       (xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
+                                                                       redirected, nredirected,
+                                                                       nowdead, ndead,
+                                                                       nowunused, nunused);
+
+               /* Freeze tuples */
+               for (int p = 0; p < nplans; p++)
+               {
+                       HeapTupleFreeze frz;
+
+                       /*
+                        * Convert freeze plan representation from WAL record into
+                        * per-tuple format used by heap_execute_freeze_tuple
+                        */
+                       frz.xmax = plans[p].xmax;
+                       frz.t_infomask2 = plans[p].t_infomask2;
+                       frz.t_infomask = plans[p].t_infomask;
+                       frz.frzflags = plans[p].frzflags;
+                       frz.offset = InvalidOffsetNumber;       /* unused, but be tidy */
+
+                       for (int i = 0; i < plans[p].ntuples; i++)
+                       {
+                               OffsetNumber offset = *(frz_offsets++);
+                               ItemId          lp;
+                               HeapTupleHeader tuple;
+
+                               lp = PageGetItemId(page, offset);
+                               tuple = (HeapTupleHeader) PageGetItem(page, lp);
+                               heap_execute_freeze_tuple(tuple, &frz);
+                       }
+               }
+
+               /* There should be no more data */
+               Assert((char *) frz_offsets == dataptr + datalen);
+
+               /*
+                * Note: we don't worry about updating the page's prunability hints.
+                * At worst this will cause an extra prune cycle to occur soon.
+                */
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+
+       /*
+        * If we released any space or line pointers, update the free space map.
+        *
+        * Do this regardless of a full-page image being applied, since the FSM
+        * data is not in the page anyway.
+        */
+       if (BufferIsValid(buffer))
+       {
+               if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
+                                                  XLHP_HAS_DEAD_ITEMS |
+                                                  XLHP_HAS_NOW_UNUSED_ITEMS))
+               {
+                       Size            freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
+
+                       UnlockReleaseBuffer(buffer);
+
+                       XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+               }
+               else
+                       UnlockReleaseBuffer(buffer);
+       }
+}
+
+/*
+ * Replay XLOG_HEAP2_VISIBLE records.
+ *
+ * The critical integrity requirement here is that we must never end up with
+ * a situation where the visibility map bit is set, and the page-level
+ * PD_ALL_VISIBLE bit is clear.  If that were to occur, then a subsequent
+ * page modification would fail to clear the visibility map bit.
+ */
+static void
+heap_xlog_visible(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+       Buffer          vmbuffer = InvalidBuffer;
+       Buffer          buffer;
+       Page            page;
+       RelFileLocator rlocator;
+       BlockNumber blkno;
+       XLogRedoAction action;
+
+       Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
+       XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
+
+       /*
+        * If there are any Hot Standby transactions running that have an xmin
+        * horizon old enough that this page isn't all-visible for them, they
+        * might incorrectly decide that an index-only scan can skip a heap fetch.
+        *
+        * NB: It might be better to throw some kind of "soft" conflict here that
+        * forces any index-only scan that is in flight to perform heap fetches,
+        * rather than killing the transaction outright.
+        */
+       if (InHotStandby)
+               ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+                                                                                       xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
+                                                                                       rlocator);
+
+       /*
+        * Read the heap page, if it still exists. If the heap file has dropped or
+        * truncated later in recovery, we don't need to update the page, but we'd
+        * better still update the visibility map.
+        */
+       action = XLogReadBufferForRedo(record, 1, &buffer);
+       if (action == BLK_NEEDS_REDO)
+       {
+               /*
+                * We don't bump the LSN of the heap page when setting the visibility
+                * map bit (unless checksums or wal_hint_bits is enabled, in which
+                * case we must). This exposes us to torn page hazards, but since
+                * we're not inspecting the existing page contents in any way, we
+                * don't care.
+                */
+               page = BufferGetPage(buffer);
+
+               PageSetAllVisible(page);
+
+               if (XLogHintBitIsNeeded())
+                       PageSetLSN(page, lsn);
+
+               MarkBufferDirty(buffer);
+       }
+       else if (action == BLK_RESTORED)
+       {
+               /*
+                * If heap block was backed up, we already restored it and there's
+                * nothing more to do. (This can only happen with checksums or
+                * wal_log_hints enabled.)
+                */
+       }
+
+       if (BufferIsValid(buffer))
+       {
+               Size            space = PageGetFreeSpace(BufferGetPage(buffer));
+
+               UnlockReleaseBuffer(buffer);
+
+               /*
+                * Since FSM is not WAL-logged and only updated heuristically, it
+                * easily becomes stale in standbys.  If the standby is later promoted
+                * and runs VACUUM, it will skip updating individual free space
+                * figures for pages that became all-visible (or all-frozen, depending
+                * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
+                * propagates too optimistic free space values to upper FSM layers;
+                * later inserters try to use such pages only to find out that they
+                * are unusable.  This can cause long stalls when there are many such
+                * pages.
+                *
+                * Forestall those problems by updating FSM's idea about a page that
+                * is becoming all-visible or all-frozen.
+                *
+                * Do this regardless of a full-page image being applied, since the
+                * FSM data is not in the page anyway.
+                */
+               if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
+                       XLogRecordPageWithFreeSpace(rlocator, blkno, space);
+       }
+
+       /*
+        * Even if we skipped the heap page update due to the LSN interlock, it's
+        * still safe to update the visibility map.  Any WAL record that clears
+        * the visibility map bit does so before checking the page LSN, so any
+        * bits that need to be cleared will still be cleared.
+        */
+       if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+                                                                         &vmbuffer) == BLK_NEEDS_REDO)
+       {
+               Page            vmpage = BufferGetPage(vmbuffer);
+               Relation        reln;
+               uint8           vmbits;
+
+               /* initialize the page if it was read as zeros */
+               if (PageIsNew(vmpage))
+                       PageInit(vmpage, BLCKSZ, 0);
+
+               /* remove VISIBILITYMAP_XLOG_* */
+               vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
+               /*
+                * XLogReadBufferForRedoExtended locked the buffer. But
+                * visibilitymap_set will handle locking itself.
+                */
+               LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
+
+               reln = CreateFakeRelcacheEntry(rlocator);
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+
+               visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
+                                                 xlrec->snapshotConflictHorizon, vmbits);
+
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+       else if (BufferIsValid(vmbuffer))
+               UnlockReleaseBuffer(vmbuffer);
+}
+
+/*
+ * Given an "infobits" field from an XLog record, set the correct bits in the
+ * given infomask and infomask2 for the tuple touched by the record.
+ *
+ * (This is the reverse of compute_infobits).
+ */
+static void
+fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
+{
+       *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
+                                  HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
+       *infomask2 &= ~HEAP_KEYS_UPDATED;
+
+       if (infobits & XLHL_XMAX_IS_MULTI)
+               *infomask |= HEAP_XMAX_IS_MULTI;
+       if (infobits & XLHL_XMAX_LOCK_ONLY)
+               *infomask |= HEAP_XMAX_LOCK_ONLY;
+       if (infobits & XLHL_XMAX_EXCL_LOCK)
+               *infomask |= HEAP_XMAX_EXCL_LOCK;
+       /* note HEAP_XMAX_SHR_LOCK isn't considered here */
+       if (infobits & XLHL_XMAX_KEYSHR_LOCK)
+               *infomask |= HEAP_XMAX_KEYSHR_LOCK;
+
+       if (infobits & XLHL_KEYS_UPDATED)
+               *infomask2 |= HEAP_KEYS_UPDATED;
+}
+
+/*
+ * Replay XLOG_HEAP_DELETE records.
+ */
+static void
+heap_xlog_delete(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       ItemId          lp = NULL;
+       HeapTupleHeader htup;
+       BlockNumber blkno;
+       RelFileLocator target_locator;
+       ItemPointerData target_tid;
+
+       XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+       ItemPointerSetBlockNumber(&target_tid, blkno);
+       ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(target_locator);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+       {
+               page = BufferGetPage(buffer);
+
+               if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+                       lp = PageGetItemId(page, xlrec->offnum);
+
+               if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+               HeapTupleHeaderClearHotUpdated(htup);
+               fix_infomask_from_infobits(xlrec->infobits_set,
+                                                                  &htup->t_infomask, &htup->t_infomask2);
+               if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
+                       HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+               else
+                       HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
+               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+
+               /* Mark the page as a candidate for pruning */
+               PageSetPrunable(page, XLogRecGetXid(record));
+
+               if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+                       PageClearAllVisible(page);
+
+               /* Make sure t_ctid is set correctly */
+               if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
+                       HeapTupleHeaderSetMovedPartitions(htup);
+               else
+                       htup->t_ctid = target_tid;
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_INSERT records.
+ */
+static void
+heap_xlog_insert(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       union
+       {
+               HeapTupleHeaderData hdr;
+               char            data[MaxHeapTupleSize];
+       }                       tbuf;
+       HeapTupleHeader htup;
+       xl_heap_header xlhdr;
+       uint32          newlen;
+       Size            freespace = 0;
+       RelFileLocator target_locator;
+       BlockNumber blkno;
+       ItemPointerData target_tid;
+       XLogRedoAction action;
+
+       XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+       ItemPointerSetBlockNumber(&target_tid, blkno);
+       ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(target_locator);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       /*
+        * If we inserted the first and only tuple on the page, re-initialize the
+        * page from scratch.
+        */
+       if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+       {
+               buffer = XLogInitBufferForRedo(record, 0);
+               page = BufferGetPage(buffer);
+               PageInit(page, BufferGetPageSize(buffer), 0);
+               action = BLK_NEEDS_REDO;
+       }
+       else
+               action = XLogReadBufferForRedo(record, 0, &buffer);
+       if (action == BLK_NEEDS_REDO)
+       {
+               Size            datalen;
+               char       *data;
+
+               page = BufferGetPage(buffer);
+
+               if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
+                       elog(PANIC, "invalid max offset number");
+
+               data = XLogRecGetBlockData(record, 0, &datalen);
+
+               newlen = datalen - SizeOfHeapHeader;
+               Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+               memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+               data += SizeOfHeapHeader;
+
+               htup = &tbuf.hdr;
+               MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+               /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+               memcpy((char *) htup + SizeofHeapTupleHeader,
+                          data,
+                          newlen);
+               newlen += SizeofHeapTupleHeader;
+               htup->t_infomask2 = xlhdr.t_infomask2;
+               htup->t_infomask = xlhdr.t_infomask;
+               htup->t_hoff = xlhdr.t_hoff;
+               HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+               HeapTupleHeaderSetCmin(htup, FirstCommandId);
+               htup->t_ctid = target_tid;
+
+               if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+                                               true, true) == InvalidOffsetNumber)
+                       elog(PANIC, "failed to add tuple");
+
+               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+               PageSetLSN(page, lsn);
+
+               if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+                       PageClearAllVisible(page);
+
+               /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+               if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+                       PageSetAllVisible(page);
+
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+
+       /*
+        * If the page is running low on free space, update the FSM as well.
+        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+        * better than that without knowing the fill-factor for the table.
+        *
+        * XXX: Don't do this if the page was restored from full page image. We
+        * don't bother to update the FSM in that case, it doesn't need to be
+        * totally accurate anyway.
+        */
+       if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+               XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP2_MULTI_INSERT records.
+ */
+static void
+heap_xlog_multi_insert(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_multi_insert *xlrec;
+       RelFileLocator rlocator;
+       BlockNumber blkno;
+       Buffer          buffer;
+       Page            page;
+       union
+       {
+               HeapTupleHeaderData hdr;
+               char            data[MaxHeapTupleSize];
+       }                       tbuf;
+       HeapTupleHeader htup;
+       uint32          newlen;
+       Size            freespace = 0;
+       int                     i;
+       bool            isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
+       XLogRedoAction action;
+
+       /*
+        * Insertion doesn't overwrite MVCC data, so no conflict processing is
+        * required.
+        */
+       xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
+
+       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+
+       /* check that the mutually exclusive flags are not both set */
+       Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
+                        (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rlocator);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       if (isinit)
+       {
+               buffer = XLogInitBufferForRedo(record, 0);
+               page = BufferGetPage(buffer);
+               PageInit(page, BufferGetPageSize(buffer), 0);
+               action = BLK_NEEDS_REDO;
+       }
+       else
+               action = XLogReadBufferForRedo(record, 0, &buffer);
+       if (action == BLK_NEEDS_REDO)
+       {
+               char       *tupdata;
+               char       *endptr;
+               Size            len;
+
+               /* Tuples are stored as block data */
+               tupdata = XLogRecGetBlockData(record, 0, &len);
+               endptr = tupdata + len;
+
+               page = (Page) BufferGetPage(buffer);
+
+               for (i = 0; i < xlrec->ntuples; i++)
+               {
+                       OffsetNumber offnum;
+                       xl_multi_insert_tuple *xlhdr;
+
+                       /*
+                        * If we're reinitializing the page, the tuples are stored in
+                        * order from FirstOffsetNumber. Otherwise there's an array of
+                        * offsets in the WAL record, and the tuples come after that.
+                        */
+                       if (isinit)
+                               offnum = FirstOffsetNumber + i;
+                       else
+                               offnum = xlrec->offsets[i];
+                       if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+                               elog(PANIC, "invalid max offset number");
+
+                       xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+                       tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+
+                       newlen = xlhdr->datalen;
+                       Assert(newlen <= MaxHeapTupleSize);
+                       htup = &tbuf.hdr;
+                       MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+                       /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+                       memcpy((char *) htup + SizeofHeapTupleHeader,
+                                  (char *) tupdata,
+                                  newlen);
+                       tupdata += newlen;
+
+                       newlen += SizeofHeapTupleHeader;
+                       htup->t_infomask2 = xlhdr->t_infomask2;
+                       htup->t_infomask = xlhdr->t_infomask;
+                       htup->t_hoff = xlhdr->t_hoff;
+                       HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+                       HeapTupleHeaderSetCmin(htup, FirstCommandId);
+                       ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+                       ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+                       offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+                       if (offnum == InvalidOffsetNumber)
+                               elog(PANIC, "failed to add tuple");
+               }
+               if (tupdata != endptr)
+                       elog(PANIC, "total tuple length mismatch");
+
+               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+               PageSetLSN(page, lsn);
+
+               if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+                       PageClearAllVisible(page);
+
+               /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+               if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+                       PageSetAllVisible(page);
+
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+
+       /*
+        * If the page is running low on free space, update the FSM as well.
+        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+        * better than that without knowing the fill-factor for the table.
+        *
+        * XXX: Don't do this if the page was restored from full page image. We
+        * don't bother to update the FSM in that case, it doesn't need to be
+        * totally accurate anyway.
+        */
+       if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+               XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE records.
+ */
+static void
+heap_xlog_update(XLogReaderState *record, bool hot_update)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
+       RelFileLocator rlocator;
+       BlockNumber oldblk;
+       BlockNumber newblk;
+       ItemPointerData newtid;
+       Buffer          obuffer,
+                               nbuffer;
+       Page            page;
+       OffsetNumber offnum;
+       ItemId          lp = NULL;
+       HeapTupleData oldtup;
+       HeapTupleHeader htup;
+       uint16          prefixlen = 0,
+                               suffixlen = 0;
+       char       *newp;
+       union
+       {
+               HeapTupleHeaderData hdr;
+               char            data[MaxHeapTupleSize];
+       }                       tbuf;
+       xl_heap_header xlhdr;
+       uint32          newlen;
+       Size            freespace = 0;
+       XLogRedoAction oldaction;
+       XLogRedoAction newaction;
+
+       /* initialize to keep the compiler quiet */
+       oldtup.t_data = NULL;
+       oldtup.t_len = 0;
+
+       XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
+       if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
+       {
+               /* HOT updates are never done across pages */
+               Assert(!hot_update);
+       }
+       else
+               oldblk = newblk;
+
+       ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rlocator);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, oldblk, &vmbuffer);
+               visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       /*
+        * In normal operation, it is important to lock the two pages in
+        * page-number order, to avoid possible deadlocks against other update
+        * operations going the other way.  However, during WAL replay there can
+        * be no other update happening, so we don't need to worry about that. But
+        * we *do* need to worry that we don't expose an inconsistent state to Hot
+        * Standby queries --- so the original page can't be unlocked before we've
+        * added the new tuple to the new page.
+        */
+
+       /* Deal with old tuple version */
+       oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
+                                                                         &obuffer);
+       if (oldaction == BLK_NEEDS_REDO)
+       {
+               page = BufferGetPage(obuffer);
+               offnum = xlrec->old_offnum;
+               if (PageGetMaxOffsetNumber(page) >= offnum)
+                       lp = PageGetItemId(page, offnum);
+
+               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               oldtup.t_data = htup;
+               oldtup.t_len = ItemIdGetLength(lp);
+
+               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+               if (hot_update)
+                       HeapTupleHeaderSetHotUpdated(htup);
+               else
+                       HeapTupleHeaderClearHotUpdated(htup);
+               fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
+                                                                  &htup->t_infomask2);
+               HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
+               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+               /* Set forward chain link in t_ctid */
+               htup->t_ctid = newtid;
+
+               /* Mark the page as a candidate for pruning */
+               PageSetPrunable(page, XLogRecGetXid(record));
+
+               if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+                       PageClearAllVisible(page);
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(obuffer);
+       }
+
+       /*
+        * Read the page the new tuple goes into, if different from old.
+        */
+       if (oldblk == newblk)
+       {
+               nbuffer = obuffer;
+               newaction = oldaction;
+       }
+       else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+       {
+               nbuffer = XLogInitBufferForRedo(record, 0);
+               page = (Page) BufferGetPage(nbuffer);
+               PageInit(page, BufferGetPageSize(nbuffer), 0);
+               newaction = BLK_NEEDS_REDO;
+       }
+       else
+               newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(rlocator);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, newblk, &vmbuffer);
+               visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       /* Deal with new tuple */
+       if (newaction == BLK_NEEDS_REDO)
+       {
+               char       *recdata;
+               char       *recdata_end;
+               Size            datalen;
+               Size            tuplen;
+
+               recdata = XLogRecGetBlockData(record, 0, &datalen);
+               recdata_end = recdata + datalen;
+
+               page = BufferGetPage(nbuffer);
+
+               offnum = xlrec->new_offnum;
+               if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+                       elog(PANIC, "invalid max offset number");
+
+               if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
+               {
+                       Assert(newblk == oldblk);
+                       memcpy(&prefixlen, recdata, sizeof(uint16));
+                       recdata += sizeof(uint16);
+               }
+               if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
+               {
+                       Assert(newblk == oldblk);
+                       memcpy(&suffixlen, recdata, sizeof(uint16));
+                       recdata += sizeof(uint16);
+               }
+
+               memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
+               recdata += SizeOfHeapHeader;
+
+               tuplen = recdata_end - recdata;
+               Assert(tuplen <= MaxHeapTupleSize);
+
+               htup = &tbuf.hdr;
+               MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+
+               /*
+                * Reconstruct the new tuple using the prefix and/or suffix from the
+                * old tuple, and the data stored in the WAL record.
+                */
+               newp = (char *) htup + SizeofHeapTupleHeader;
+               if (prefixlen > 0)
+               {
+                       int                     len;
+
+                       /* copy bitmap [+ padding] [+ oid] from WAL record */
+                       len = xlhdr.t_hoff - SizeofHeapTupleHeader;
+                       memcpy(newp, recdata, len);
+                       recdata += len;
+                       newp += len;
+
+                       /* copy prefix from old tuple */
+                       memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+                       newp += prefixlen;
+
+                       /* copy new tuple data from WAL record */
+                       len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
+                       memcpy(newp, recdata, len);
+                       recdata += len;
+                       newp += len;
+               }
+               else
+               {
+                       /*
+                        * copy bitmap [+ padding] [+ oid] + data from record, all in one
+                        * go
+                        */
+                       memcpy(newp, recdata, tuplen);
+                       recdata += tuplen;
+                       newp += tuplen;
+               }
+               Assert(recdata == recdata_end);
+
+               /* copy suffix from old tuple */
+               if (suffixlen > 0)
+                       memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+               newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
+               htup->t_infomask2 = xlhdr.t_infomask2;
+               htup->t_infomask = xlhdr.t_infomask;
+               htup->t_hoff = xlhdr.t_hoff;
+
+               HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+               HeapTupleHeaderSetCmin(htup, FirstCommandId);
+               HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
+               /* Make sure there is no forward chain link in t_ctid */
+               htup->t_ctid = newtid;
+
+               offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+               if (offnum == InvalidOffsetNumber)
+                       elog(PANIC, "failed to add tuple");
+
+               if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+                       PageClearAllVisible(page);
+
+               freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(nbuffer);
+       }
+
+       if (BufferIsValid(nbuffer) && nbuffer != obuffer)
+               UnlockReleaseBuffer(nbuffer);
+       if (BufferIsValid(obuffer))
+               UnlockReleaseBuffer(obuffer);
+
+       /*
+        * If the new page is running low on free space, update the FSM as well.
+        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+        * better than that without knowing the fill-factor for the table.
+        *
+        * However, don't update the FSM on HOT updates, because after crash
+        * recovery, either the old or the new tuple will certainly be dead and
+        * prunable. After pruning, the page will have roughly as much free space
+        * as it did before the update, assuming the new tuple is about the same
+        * size as the old one.
+        *
+        * XXX: Don't do this if the page was restored from full page image. We
+        * don't bother to update the FSM in that case, it doesn't need to be
+        * totally accurate anyway.
+        */
+       if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
+               XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP_CONFIRM records.
+ */
+static void
+heap_xlog_confirm(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       OffsetNumber offnum;
+       ItemId          lp = NULL;
+       HeapTupleHeader htup;
+
+       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+       {
+               page = BufferGetPage(buffer);
+
+               offnum = xlrec->offnum;
+               if (PageGetMaxOffsetNumber(page) >= offnum)
+                       lp = PageGetItemId(page, offnum);
+
+               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               /*
+                * Confirm tuple as actually inserted
+                */
+               ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_LOCK records.
+ */
+static void
+heap_xlog_lock(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       OffsetNumber offnum;
+       ItemId          lp = NULL;
+       HeapTupleHeader htup;
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+       {
+               RelFileLocator rlocator;
+               Buffer          vmbuffer = InvalidBuffer;
+               BlockNumber block;
+               Relation        reln;
+
+               XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+               reln = CreateFakeRelcacheEntry(rlocator);
+
+               visibilitymap_pin(reln, block, &vmbuffer);
+               visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+       {
+               page = (Page) BufferGetPage(buffer);
+
+               offnum = xlrec->offnum;
+               if (PageGetMaxOffsetNumber(page) >= offnum)
+                       lp = PageGetItemId(page, offnum);
+
+               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+               fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+                                                                  &htup->t_infomask2);
+
+               /*
+                * Clear relevant update flags, but only if the modified infomask says
+                * there's no update.
+                */
+               if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
+               {
+                       HeapTupleHeaderClearHotUpdated(htup);
+                       /* Make sure there is no forward chain link in t_ctid */
+                       ItemPointerSet(&htup->t_ctid,
+                                                  BufferGetBlockNumber(buffer),
+                                                  offnum);
+               }
+               HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+               HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP2_LOCK_UPDATED records.
+ */
+static void
+heap_xlog_lock_updated(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_lock_updated *xlrec;
+       Buffer          buffer;
+       Page            page;
+       OffsetNumber offnum;
+       ItemId          lp = NULL;
+       HeapTupleHeader htup;
+
+       xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+       {
+               RelFileLocator rlocator;
+               Buffer          vmbuffer = InvalidBuffer;
+               BlockNumber block;
+               Relation        reln;
+
+               XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+               reln = CreateFakeRelcacheEntry(rlocator);
+
+               visibilitymap_pin(reln, block, &vmbuffer);
+               visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+       {
+               page = BufferGetPage(buffer);
+
+               offnum = xlrec->offnum;
+               if (PageGetMaxOffsetNumber(page) >= offnum)
+                       lp = PageGetItemId(page, offnum);
+
+               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+               fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+                                                                  &htup->t_infomask2);
+               HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_INPLACE records.
+ */
+static void
+heap_xlog_inplace(XLogReaderState *record)
+{
+       XLogRecPtr      lsn = record->EndRecPtr;
+       xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
+       Buffer          buffer;
+       Page            page;
+       OffsetNumber offnum;
+       ItemId          lp = NULL;
+       HeapTupleHeader htup;
+       uint32          oldlen;
+       Size            newlen;
+
+       if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+       {
+               char       *newtup = XLogRecGetBlockData(record, 0, &newlen);
+
+               page = BufferGetPage(buffer);
+
+               offnum = xlrec->offnum;
+               if (PageGetMaxOffsetNumber(page) >= offnum)
+                       lp = PageGetItemId(page, offnum);
+
+               if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+                       elog(PANIC, "invalid lp");
+
+               htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+               oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+               if (oldlen != newlen)
+                       elog(PANIC, "wrong tuple length");
+
+               memcpy((char *) htup + htup->t_hoff, newtup, newlen);
+
+               PageSetLSN(page, lsn);
+               MarkBufferDirty(buffer);
+       }
+       if (BufferIsValid(buffer))
+               UnlockReleaseBuffer(buffer);
+}
+
+void
+heap_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       /*
+        * These operations don't overwrite MVCC data so no conflict processing is
+        * required. The ones in heap2 rmgr do.
+        */
+
+       switch (info & XLOG_HEAP_OPMASK)
+       {
+               case XLOG_HEAP_INSERT:
+                       heap_xlog_insert(record);
+                       break;
+               case XLOG_HEAP_DELETE:
+                       heap_xlog_delete(record);
+                       break;
+               case XLOG_HEAP_UPDATE:
+                       heap_xlog_update(record, false);
+                       break;
+               case XLOG_HEAP_TRUNCATE:
+
+                       /*
+                        * TRUNCATE is a no-op because the actions are already logged as
+                        * SMGR WAL records.  TRUNCATE WAL record only exists for logical
+                        * decoding.
+                        */
+                       break;
+               case XLOG_HEAP_HOT_UPDATE:
+                       heap_xlog_update(record, true);
+                       break;
+               case XLOG_HEAP_CONFIRM:
+                       heap_xlog_confirm(record);
+                       break;
+               case XLOG_HEAP_LOCK:
+                       heap_xlog_lock(record);
+                       break;
+               case XLOG_HEAP_INPLACE:
+                       heap_xlog_inplace(record);
+                       break;
+               default:
+                       elog(PANIC, "heap_redo: unknown op code %u", info);
+       }
+}
+
+void
+heap2_redo(XLogReaderState *record)
+{
+       uint8           info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+       switch (info & XLOG_HEAP_OPMASK)
+       {
+               case XLOG_HEAP2_PRUNE_ON_ACCESS:
+               case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
+               case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
+                       heap_xlog_prune_freeze(record);
+                       break;
+               case XLOG_HEAP2_VISIBLE:
+                       heap_xlog_visible(record);
+                       break;
+               case XLOG_HEAP2_MULTI_INSERT:
+                       heap_xlog_multi_insert(record);
+                       break;
+               case XLOG_HEAP2_LOCK_UPDATED:
+                       heap_xlog_lock_updated(record);
+                       break;
+               case XLOG_HEAP2_NEW_CID:
+
+                       /*
+                        * Nothing to do on a real replay, only used during logical
+                        * decoding.
+                        */
+                       break;
+               case XLOG_HEAP2_REWRITE:
+                       heap_xlog_logical_rewrite(record);
+                       break;
+               default:
+                       elog(PANIC, "heap2_redo: unknown op code %u", info);
+       }
+}
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *pagedata, BlockNumber blkno)
+{
+       Page            page = (Page) pagedata;
+       OffsetNumber off;
+
+       mask_page_lsn_and_checksum(page);
+
+       mask_page_hint_bits(page);
+       mask_unused_space(page);
+
+       for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+       {
+               ItemId          iid = PageGetItemId(page, off);
+               char       *page_item;
+
+               page_item = (char *) (page + ItemIdGetOffset(iid));
+
+               if (ItemIdIsNormal(iid))
+               {
+                       HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+                       /*
+                        * If xmin of a tuple is not yet frozen, we should ignore
+                        * differences in hint bits, since they can be set without
+                        * emitting WAL.
+                        */
+                       if (!HeapTupleHeaderXminFrozen(page_htup))
+                               page_htup->t_infomask &= ~HEAP_XACT_MASK;
+                       else
+                       {
+                               /* Still we need to mask xmax hint bits. */
+                               page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
+                               page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
+                       }
+
+                       /*
+                        * During replay, we set Command Id to FirstCommandId. Hence, mask
+                        * it. See heap_xlog_insert() for details.
+                        */
+                       page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
+
+                       /*
+                        * For a speculative tuple, heap_insert() does not set ctid in the
+                        * caller-passed heap tuple itself, leaving the ctid field to
+                        * contain a speculative token value - a per-backend monotonically
+                        * increasing identifier. Besides, it does not WAL-log ctid under
+                        * any circumstances.
+                        *
+                        * During redo, heap_xlog_insert() sets t_ctid to current block
+                        * number and self offset number. It doesn't care about any
+                        * speculative insertions on the primary. Hence, we set t_ctid to
+                        * current block number and self offset number to ignore any
+                        * inconsistency.
+                        */
+                       if (HeapTupleHeaderIsSpeculative(page_htup))
+                               ItemPointerSet(&page_htup->t_ctid, blkno, off);
+
+                       /*
+                        * NB: Not ignoring ctid changes due to the tuple having moved
+                        * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
+                        * important information that needs to be in-sync between primary
+                        * and standby, and thus is WAL logged.
+                        */
+               }
+
+               /*
+                * Ignore any padding bytes after the tuple, when the length of the
+                * item is not MAXALIGNed.
+                */
+               if (ItemIdHasStorage(iid))
+               {
+                       int                     len = ItemIdGetLength(iid);
+                       int                     padlen = MAXALIGN(len) - len;
+
+                       if (padlen > 0)
+                               memset(page_item + len, MASK_MARKER, padlen);
+               }
+       }
+}
index e00d5b4f0de2d61347a9f4249d142f40ba55a452..19a990208ef844e334a1ddc0d83ea945338879d7 100644 (file)
@@ -4,6 +4,7 @@ backend_sources += files(
   'heapam.c',
   'heapam_handler.c',
   'heapam_visibility.c',
+  'heapam_xlog.c',
   'heaptoast.c',
   'hio.c',
   'pruneheap.c',
index 9e9aec88a620a1fda831349c683fcb315048550c..b92eb506ecb040725436dbba9cc2db6e91751664 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef HEAPAM_H
 #define HEAPAM_H
 
+#include "access/heapam_xlog.h"
 #include "access/relation.h"   /* for backward compatibility */
 #include "access/relscan.h"
 #include "access/sdir.h"
@@ -422,4 +423,28 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
                                                                                                Buffer buffer, Snapshot snapshot);
 
+/*
+ * heap_execute_freeze_tuple
+ *             Execute the prepared freezing of a tuple with caller's freeze plan.
+ *
+ * Caller is responsible for ensuring that no other backend can access the
+ * storage underlying this tuple, either by holding an exclusive lock on the
+ * buffer containing it (which is what lazy VACUUM does), or by having it be
+ * in private storage (which is what CLUSTER and friends do).
+ */
+static inline void
+heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
+{
+       HeapTupleHeaderSetXmax(tuple, frz->xmax);
+
+       if (frz->frzflags & XLH_FREEZE_XVAC)
+               HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
+
+       if (frz->frzflags & XLH_INVALID_XVAC)
+               HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
+
+       tuple->t_infomask = frz->t_infomask;
+       tuple->t_infomask2 = frz->t_infomask2;
+}
+
 #endif                                                 /* HEAPAM_H */