*/
#include "postgres.h"
-#include "access/bufmask.h"
#include "access/heapam.h"
-#include "access/heapam_xlog.h"
#include "access/heaptoast.h"
#include "access/hio.h"
#include "access/multixact.h"
-#include "access/parallel.h"
-#include "access/relscan.h"
#include "access/subtrans.h"
#include "access/syncscan.h"
-#include "access/sysattr.h"
-#include "access/tableam.h"
-#include "access/transam.h"
#include "access/valid.h"
#include "access/visibilitymap.h"
-#include "access/xact.h"
-#include "access/xlog.h"
#include "access/xloginsert.h"
-#include "access/xlogutils.h"
-#include "catalog/catalog.h"
#include "commands/vacuum.h"
-#include "miscadmin.h"
#include "pgstat.h"
-#include "port/atomics.h"
#include "port/pg_bitutils.h"
-#include "storage/bufmgr.h"
-#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
-#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/injection_point.h"
#include "utils/inval.h"
-#include "utils/relcache.h"
-#include "utils/snapmgr.h"
#include "utils/spccache.h"
return freeze_xmin || replace_xvac || replace_xmax || freeze_xmax;
}
-/*
- * heap_execute_freeze_tuple
- * Execute the prepared freezing of a tuple with caller's freeze plan.
- *
- * Caller is responsible for ensuring that no other backend can access the
- * storage underlying this tuple, either by holding an exclusive lock on the
- * buffer containing it (which is what lazy VACUUM does), or by having it be
- * in private storage (which is what CLUSTER and friends do).
- */
-static inline void
-heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz)
-{
- HeapTupleHeaderSetXmax(tuple, frz->xmax);
-
- if (frz->frzflags & XLH_FREEZE_XVAC)
- HeapTupleHeaderSetXvac(tuple, FrozenTransactionId);
-
- if (frz->frzflags & XLH_INVALID_XVAC)
- HeapTupleHeaderSetXvac(tuple, InvalidTransactionId);
-
- tuple->t_infomask = frz->t_infomask;
- tuple->t_infomask2 = frz->t_infomask2;
-}
-
/*
* Perform xmin/xmax XID status sanity checks before actually executing freeze
* plans.
return key_tuple;
}
-/*
- * Replay XLOG_HEAP2_PRUNE_* records.
- */
-static void
-heap_xlog_prune_freeze(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- char *maindataptr = XLogRecGetData(record);
- xl_heap_prune xlrec;
- Buffer buffer;
- RelFileLocator rlocator;
- BlockNumber blkno;
- XLogRedoAction action;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
- memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
- maindataptr += SizeOfHeapPrune;
-
- /*
- * We will take an ordinary exclusive lock or a cleanup lock depending on
- * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive
- * lock, we better not be doing anything that requires moving existing
- * tuple data.
- */
- Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
- (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
-
- /*
- * We are about to remove and/or freeze tuples. In Hot Standby mode,
- * ensure that there are no queries running for which the removed tuples
- * are still visible or which still consider the frozen xids as running.
- * The conflict horizon XID comes after xl_heap_prune.
- */
- if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
- {
- TransactionId snapshot_conflict_horizon;
-
- /* memcpy() because snapshot_conflict_horizon is stored unaligned */
- memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
- maindataptr += sizeof(TransactionId);
-
- if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
- (xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
- rlocator);
- }
-
- /*
- * If we have a full-page image, restore it and we're done.
- */
- action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
- (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
- &buffer);
- if (action == BLK_NEEDS_REDO)
- {
- Page page = (Page) BufferGetPage(buffer);
- OffsetNumber *redirected;
- OffsetNumber *nowdead;
- OffsetNumber *nowunused;
- int nredirected;
- int ndead;
- int nunused;
- int nplans;
- Size datalen;
- xlhp_freeze_plan *plans;
- OffsetNumber *frz_offsets;
- char *dataptr = XLogRecGetBlockData(record, 0, &datalen);
-
- heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
- &nplans, &plans, &frz_offsets,
- &nredirected, &redirected,
- &ndead, &nowdead,
- &nunused, &nowunused);
-
- /*
- * Update all line pointers per the record, and repair fragmentation
- * if needed.
- */
- if (nredirected > 0 || ndead > 0 || nunused > 0)
- heap_page_prune_execute(buffer,
- (xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
- redirected, nredirected,
- nowdead, ndead,
- nowunused, nunused);
-
- /* Freeze tuples */
- for (int p = 0; p < nplans; p++)
- {
- HeapTupleFreeze frz;
-
- /*
- * Convert freeze plan representation from WAL record into
- * per-tuple format used by heap_execute_freeze_tuple
- */
- frz.xmax = plans[p].xmax;
- frz.t_infomask2 = plans[p].t_infomask2;
- frz.t_infomask = plans[p].t_infomask;
- frz.frzflags = plans[p].frzflags;
- frz.offset = InvalidOffsetNumber; /* unused, but be tidy */
-
- for (int i = 0; i < plans[p].ntuples; i++)
- {
- OffsetNumber offset = *(frz_offsets++);
- ItemId lp;
- HeapTupleHeader tuple;
-
- lp = PageGetItemId(page, offset);
- tuple = (HeapTupleHeader) PageGetItem(page, lp);
- heap_execute_freeze_tuple(tuple, &frz);
- }
- }
-
- /* There should be no more data */
- Assert((char *) frz_offsets == dataptr + datalen);
-
- /*
- * Note: we don't worry about updating the page's prunability hints.
- * At worst this will cause an extra prune cycle to occur soon.
- */
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
-
- /*
- * If we released any space or line pointers, update the free space map.
- *
- * Do this regardless of a full-page image being applied, since the FSM
- * data is not in the page anyway.
- */
- if (BufferIsValid(buffer))
- {
- if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
- XLHP_HAS_DEAD_ITEMS |
- XLHP_HAS_NOW_UNUSED_ITEMS))
- {
- Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
-
- UnlockReleaseBuffer(buffer);
-
- XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
- }
- else
- UnlockReleaseBuffer(buffer);
- }
-}
-
-/*
- * Replay XLOG_HEAP2_VISIBLE record.
- *
- * The critical integrity requirement here is that we must never end up with
- * a situation where the visibility map bit is set, and the page-level
- * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent
- * page modification would fail to clear the visibility map bit.
- */
-static void
-heap_xlog_visible(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
- Buffer vmbuffer = InvalidBuffer;
- Buffer buffer;
- Page page;
- RelFileLocator rlocator;
- BlockNumber blkno;
- XLogRedoAction action;
-
- Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
-
- XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
-
- /*
- * If there are any Hot Standby transactions running that have an xmin
- * horizon old enough that this page isn't all-visible for them, they
- * might incorrectly decide that an index-only scan can skip a heap fetch.
- *
- * NB: It might be better to throw some kind of "soft" conflict here that
- * forces any index-only scan that is in flight to perform heap fetches,
- * rather than killing the transaction outright.
- */
- if (InHotStandby)
- ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
- xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
- rlocator);
-
- /*
- * Read the heap page, if it still exists. If the heap file has dropped or
- * truncated later in recovery, we don't need to update the page, but we'd
- * better still update the visibility map.
- */
- action = XLogReadBufferForRedo(record, 1, &buffer);
- if (action == BLK_NEEDS_REDO)
- {
- /*
- * We don't bump the LSN of the heap page when setting the visibility
- * map bit (unless checksums or wal_hint_bits is enabled, in which
- * case we must). This exposes us to torn page hazards, but since
- * we're not inspecting the existing page contents in any way, we
- * don't care.
- */
- page = BufferGetPage(buffer);
-
- PageSetAllVisible(page);
-
- if (XLogHintBitIsNeeded())
- PageSetLSN(page, lsn);
-
- MarkBufferDirty(buffer);
- }
- else if (action == BLK_RESTORED)
- {
- /*
- * If heap block was backed up, we already restored it and there's
- * nothing more to do. (This can only happen with checksums or
- * wal_log_hints enabled.)
- */
- }
-
- if (BufferIsValid(buffer))
- {
- Size space = PageGetFreeSpace(BufferGetPage(buffer));
-
- UnlockReleaseBuffer(buffer);
-
- /*
- * Since FSM is not WAL-logged and only updated heuristically, it
- * easily becomes stale in standbys. If the standby is later promoted
- * and runs VACUUM, it will skip updating individual free space
- * figures for pages that became all-visible (or all-frozen, depending
- * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
- * propagates too optimistic free space values to upper FSM layers;
- * later inserters try to use such pages only to find out that they
- * are unusable. This can cause long stalls when there are many such
- * pages.
- *
- * Forestall those problems by updating FSM's idea about a page that
- * is becoming all-visible or all-frozen.
- *
- * Do this regardless of a full-page image being applied, since the
- * FSM data is not in the page anyway.
- */
- if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
- XLogRecordPageWithFreeSpace(rlocator, blkno, space);
- }
-
- /*
- * Even if we skipped the heap page update due to the LSN interlock, it's
- * still safe to update the visibility map. Any WAL record that clears
- * the visibility map bit does so before checking the page LSN, so any
- * bits that need to be cleared will still be cleared.
- */
- if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
- &vmbuffer) == BLK_NEEDS_REDO)
- {
- Page vmpage = BufferGetPage(vmbuffer);
- Relation reln;
- uint8 vmbits;
-
- /* initialize the page if it was read as zeros */
- if (PageIsNew(vmpage))
- PageInit(vmpage, BLCKSZ, 0);
-
- /* remove VISIBILITYMAP_XLOG_* */
- vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
-
- /*
- * XLogReadBufferForRedoExtended locked the buffer. But
- * visibilitymap_set will handle locking itself.
- */
- LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
-
- reln = CreateFakeRelcacheEntry(rlocator);
- visibilitymap_pin(reln, blkno, &vmbuffer);
-
- visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
- xlrec->snapshotConflictHorizon, vmbits);
-
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
- else if (BufferIsValid(vmbuffer))
- UnlockReleaseBuffer(vmbuffer);
-}
-
-/*
- * Given an "infobits" field from an XLog record, set the correct bits in the
- * given infomask and infomask2 for the tuple touched by the record.
- *
- * (This is the reverse of compute_infobits).
- */
-static void
-fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
-{
- *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
- HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
- *infomask2 &= ~HEAP_KEYS_UPDATED;
-
- if (infobits & XLHL_XMAX_IS_MULTI)
- *infomask |= HEAP_XMAX_IS_MULTI;
- if (infobits & XLHL_XMAX_LOCK_ONLY)
- *infomask |= HEAP_XMAX_LOCK_ONLY;
- if (infobits & XLHL_XMAX_EXCL_LOCK)
- *infomask |= HEAP_XMAX_EXCL_LOCK;
- /* note HEAP_XMAX_SHR_LOCK isn't considered here */
- if (infobits & XLHL_XMAX_KEYSHR_LOCK)
- *infomask |= HEAP_XMAX_KEYSHR_LOCK;
-
- if (infobits & XLHL_KEYS_UPDATED)
- *infomask2 |= HEAP_KEYS_UPDATED;
-}
-
-static void
-heap_xlog_delete(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
- ItemId lp = NULL;
- HeapTupleHeader htup;
- BlockNumber blkno;
- RelFileLocator target_locator;
- ItemPointerData target_tid;
-
- XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
- ItemPointerSetBlockNumber(&target_tid, blkno);
- ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
- {
- Relation reln = CreateFakeRelcacheEntry(target_locator);
- Buffer vmbuffer = InvalidBuffer;
-
- visibilitymap_pin(reln, blkno, &vmbuffer);
- visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- page = BufferGetPage(buffer);
-
- if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
- lp = PageGetItemId(page, xlrec->offnum);
-
- if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
- htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
- HeapTupleHeaderClearHotUpdated(htup);
- fix_infomask_from_infobits(xlrec->infobits_set,
- &htup->t_infomask, &htup->t_infomask2);
- if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
- HeapTupleHeaderSetXmax(htup, xlrec->xmax);
- else
- HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
- HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-
- /* Mark the page as a candidate for pruning */
- PageSetPrunable(page, XLogRecGetXid(record));
-
- if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
- PageClearAllVisible(page);
-
- /* Make sure t_ctid is set correctly */
- if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
- HeapTupleHeaderSetMovedPartitions(htup);
- else
- htup->t_ctid = target_tid;
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_insert(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
- union
- {
- HeapTupleHeaderData hdr;
- char data[MaxHeapTupleSize];
- } tbuf;
- HeapTupleHeader htup;
- xl_heap_header xlhdr;
- uint32 newlen;
- Size freespace = 0;
- RelFileLocator target_locator;
- BlockNumber blkno;
- ItemPointerData target_tid;
- XLogRedoAction action;
-
- XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
- ItemPointerSetBlockNumber(&target_tid, blkno);
- ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
- {
- Relation reln = CreateFakeRelcacheEntry(target_locator);
- Buffer vmbuffer = InvalidBuffer;
-
- visibilitymap_pin(reln, blkno, &vmbuffer);
- visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- /*
- * If we inserted the first and only tuple on the page, re-initialize the
- * page from scratch.
- */
- if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
- {
- buffer = XLogInitBufferForRedo(record, 0);
- page = BufferGetPage(buffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
- action = BLK_NEEDS_REDO;
- }
- else
- action = XLogReadBufferForRedo(record, 0, &buffer);
- if (action == BLK_NEEDS_REDO)
- {
- Size datalen;
- char *data;
-
- page = BufferGetPage(buffer);
-
- if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
- elog(PANIC, "invalid max offset number");
-
- data = XLogRecGetBlockData(record, 0, &datalen);
-
- newlen = datalen - SizeOfHeapHeader;
- Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
- memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
- data += SizeOfHeapHeader;
-
- htup = &tbuf.hdr;
- MemSet((char *) htup, 0, SizeofHeapTupleHeader);
- /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
- memcpy((char *) htup + SizeofHeapTupleHeader,
- data,
- newlen);
- newlen += SizeofHeapTupleHeader;
- htup->t_infomask2 = xlhdr.t_infomask2;
- htup->t_infomask = xlhdr.t_infomask;
- htup->t_hoff = xlhdr.t_hoff;
- HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
- HeapTupleHeaderSetCmin(htup, FirstCommandId);
- htup->t_ctid = target_tid;
-
- if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
- true, true) == InvalidOffsetNumber)
- elog(PANIC, "failed to add tuple");
-
- freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
- PageSetLSN(page, lsn);
-
- if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
- PageClearAllVisible(page);
-
- /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
- if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
- PageSetAllVisible(page);
-
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-
- /*
- * If the page is running low on free space, update the FSM as well.
- * Arbitrarily, our definition of "low" is less than 20%. We can't do much
- * better than that without knowing the fill-factor for the table.
- *
- * XXX: Don't do this if the page was restored from full page image. We
- * don't bother to update the FSM in that case, it doesn't need to be
- * totally accurate anyway.
- */
- if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
-}
-
-/*
- * Handles MULTI_INSERT record type.
- */
-static void
-heap_xlog_multi_insert(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_multi_insert *xlrec;
- RelFileLocator rlocator;
- BlockNumber blkno;
- Buffer buffer;
- Page page;
- union
- {
- HeapTupleHeaderData hdr;
- char data[MaxHeapTupleSize];
- } tbuf;
- HeapTupleHeader htup;
- uint32 newlen;
- Size freespace = 0;
- int i;
- bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
- XLogRedoAction action;
-
- /*
- * Insertion doesn't overwrite MVCC data, so no conflict processing is
- * required.
- */
- xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
-
- /* check that the mutually exclusive flags are not both set */
- Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
- (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
- {
- Relation reln = CreateFakeRelcacheEntry(rlocator);
- Buffer vmbuffer = InvalidBuffer;
-
- visibilitymap_pin(reln, blkno, &vmbuffer);
- visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- if (isinit)
- {
- buffer = XLogInitBufferForRedo(record, 0);
- page = BufferGetPage(buffer);
- PageInit(page, BufferGetPageSize(buffer), 0);
- action = BLK_NEEDS_REDO;
- }
- else
- action = XLogReadBufferForRedo(record, 0, &buffer);
- if (action == BLK_NEEDS_REDO)
- {
- char *tupdata;
- char *endptr;
- Size len;
-
- /* Tuples are stored as block data */
- tupdata = XLogRecGetBlockData(record, 0, &len);
- endptr = tupdata + len;
-
- page = (Page) BufferGetPage(buffer);
-
- for (i = 0; i < xlrec->ntuples; i++)
- {
- OffsetNumber offnum;
- xl_multi_insert_tuple *xlhdr;
-
- /*
- * If we're reinitializing the page, the tuples are stored in
- * order from FirstOffsetNumber. Otherwise there's an array of
- * offsets in the WAL record, and the tuples come after that.
- */
- if (isinit)
- offnum = FirstOffsetNumber + i;
- else
- offnum = xlrec->offsets[i];
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
- elog(PANIC, "invalid max offset number");
-
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
- tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
-
- newlen = xlhdr->datalen;
- Assert(newlen <= MaxHeapTupleSize);
- htup = &tbuf.hdr;
- MemSet((char *) htup, 0, SizeofHeapTupleHeader);
- /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
- memcpy((char *) htup + SizeofHeapTupleHeader,
- (char *) tupdata,
- newlen);
- tupdata += newlen;
-
- newlen += SizeofHeapTupleHeader;
- htup->t_infomask2 = xlhdr->t_infomask2;
- htup->t_infomask = xlhdr->t_infomask;
- htup->t_hoff = xlhdr->t_hoff;
- HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
- HeapTupleHeaderSetCmin(htup, FirstCommandId);
- ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
- ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
-
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
- if (offnum == InvalidOffsetNumber)
- elog(PANIC, "failed to add tuple");
- }
- if (tupdata != endptr)
- elog(PANIC, "total tuple length mismatch");
-
- freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
- PageSetLSN(page, lsn);
-
- if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
- PageClearAllVisible(page);
-
- /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
- if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
- PageSetAllVisible(page);
-
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-
- /*
- * If the page is running low on free space, update the FSM as well.
- * Arbitrarily, our definition of "low" is less than 20%. We can't do much
- * better than that without knowing the fill-factor for the table.
- *
- * XXX: Don't do this if the page was restored from full page image. We
- * don't bother to update the FSM in that case, it doesn't need to be
- * totally accurate anyway.
- */
- if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
-}
-
-/*
- * Handles UPDATE and HOT_UPDATE
- */
-static void
-heap_xlog_update(XLogReaderState *record, bool hot_update)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
- RelFileLocator rlocator;
- BlockNumber oldblk;
- BlockNumber newblk;
- ItemPointerData newtid;
- Buffer obuffer,
- nbuffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleData oldtup;
- HeapTupleHeader htup;
- uint16 prefixlen = 0,
- suffixlen = 0;
- char *newp;
- union
- {
- HeapTupleHeaderData hdr;
- char data[MaxHeapTupleSize];
- } tbuf;
- xl_heap_header xlhdr;
- uint32 newlen;
- Size freespace = 0;
- XLogRedoAction oldaction;
- XLogRedoAction newaction;
-
- /* initialize to keep the compiler quiet */
- oldtup.t_data = NULL;
- oldtup.t_len = 0;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
- if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
- {
- /* HOT updates are never done across pages */
- Assert(!hot_update);
- }
- else
- oldblk = newblk;
-
- ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
- {
- Relation reln = CreateFakeRelcacheEntry(rlocator);
- Buffer vmbuffer = InvalidBuffer;
-
- visibilitymap_pin(reln, oldblk, &vmbuffer);
- visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- /*
- * In normal operation, it is important to lock the two pages in
- * page-number order, to avoid possible deadlocks against other update
- * operations going the other way. However, during WAL replay there can
- * be no other update happening, so we don't need to worry about that. But
- * we *do* need to worry that we don't expose an inconsistent state to Hot
- * Standby queries --- so the original page can't be unlocked before we've
- * added the new tuple to the new page.
- */
-
- /* Deal with old tuple version */
- oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
- &obuffer);
- if (oldaction == BLK_NEEDS_REDO)
- {
- page = BufferGetPage(obuffer);
- offnum = xlrec->old_offnum;
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- oldtup.t_data = htup;
- oldtup.t_len = ItemIdGetLength(lp);
-
- htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
- htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
- if (hot_update)
- HeapTupleHeaderSetHotUpdated(htup);
- else
- HeapTupleHeaderClearHotUpdated(htup);
- fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
- &htup->t_infomask2);
- HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
- HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
- /* Set forward chain link in t_ctid */
- htup->t_ctid = newtid;
-
- /* Mark the page as a candidate for pruning */
- PageSetPrunable(page, XLogRecGetXid(record));
-
- if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
- PageClearAllVisible(page);
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(obuffer);
- }
-
- /*
- * Read the page the new tuple goes into, if different from old.
- */
- if (oldblk == newblk)
- {
- nbuffer = obuffer;
- newaction = oldaction;
- }
- else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
- {
- nbuffer = XLogInitBufferForRedo(record, 0);
- page = (Page) BufferGetPage(nbuffer);
- PageInit(page, BufferGetPageSize(nbuffer), 0);
- newaction = BLK_NEEDS_REDO;
- }
- else
- newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
- {
- Relation reln = CreateFakeRelcacheEntry(rlocator);
- Buffer vmbuffer = InvalidBuffer;
-
- visibilitymap_pin(reln, newblk, &vmbuffer);
- visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- /* Deal with new tuple */
- if (newaction == BLK_NEEDS_REDO)
- {
- char *recdata;
- char *recdata_end;
- Size datalen;
- Size tuplen;
-
- recdata = XLogRecGetBlockData(record, 0, &datalen);
- recdata_end = recdata + datalen;
-
- page = BufferGetPage(nbuffer);
-
- offnum = xlrec->new_offnum;
- if (PageGetMaxOffsetNumber(page) + 1 < offnum)
- elog(PANIC, "invalid max offset number");
-
- if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
- {
- Assert(newblk == oldblk);
- memcpy(&prefixlen, recdata, sizeof(uint16));
- recdata += sizeof(uint16);
- }
- if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
- {
- Assert(newblk == oldblk);
- memcpy(&suffixlen, recdata, sizeof(uint16));
- recdata += sizeof(uint16);
- }
-
- memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
- recdata += SizeOfHeapHeader;
-
- tuplen = recdata_end - recdata;
- Assert(tuplen <= MaxHeapTupleSize);
-
- htup = &tbuf.hdr;
- MemSet((char *) htup, 0, SizeofHeapTupleHeader);
-
- /*
- * Reconstruct the new tuple using the prefix and/or suffix from the
- * old tuple, and the data stored in the WAL record.
- */
- newp = (char *) htup + SizeofHeapTupleHeader;
- if (prefixlen > 0)
- {
- int len;
-
- /* copy bitmap [+ padding] [+ oid] from WAL record */
- len = xlhdr.t_hoff - SizeofHeapTupleHeader;
- memcpy(newp, recdata, len);
- recdata += len;
- newp += len;
-
- /* copy prefix from old tuple */
- memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
- newp += prefixlen;
-
- /* copy new tuple data from WAL record */
- len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
- memcpy(newp, recdata, len);
- recdata += len;
- newp += len;
- }
- else
- {
- /*
- * copy bitmap [+ padding] [+ oid] + data from record, all in one
- * go
- */
- memcpy(newp, recdata, tuplen);
- recdata += tuplen;
- newp += tuplen;
- }
- Assert(recdata == recdata_end);
-
- /* copy suffix from old tuple */
- if (suffixlen > 0)
- memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
-
- newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
- htup->t_infomask2 = xlhdr.t_infomask2;
- htup->t_infomask = xlhdr.t_infomask;
- htup->t_hoff = xlhdr.t_hoff;
-
- HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
- HeapTupleHeaderSetCmin(htup, FirstCommandId);
- HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
- /* Make sure there is no forward chain link in t_ctid */
- htup->t_ctid = newtid;
-
- offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
- if (offnum == InvalidOffsetNumber)
- elog(PANIC, "failed to add tuple");
-
- if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
- PageClearAllVisible(page);
-
- freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(nbuffer);
- }
-
- if (BufferIsValid(nbuffer) && nbuffer != obuffer)
- UnlockReleaseBuffer(nbuffer);
- if (BufferIsValid(obuffer))
- UnlockReleaseBuffer(obuffer);
-
- /*
- * If the new page is running low on free space, update the FSM as well.
- * Arbitrarily, our definition of "low" is less than 20%. We can't do much
- * better than that without knowing the fill-factor for the table.
- *
- * However, don't update the FSM on HOT updates, because after crash
- * recovery, either the old or the new tuple will certainly be dead and
- * prunable. After pruning, the page will have roughly as much free space
- * as it did before the update, assuming the new tuple is about the same
- * size as the old one.
- *
- * XXX: Don't do this if the page was restored from full page image. We
- * don't bother to update the FSM in that case, it doesn't need to be
- * totally accurate anyway.
- */
- if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
- XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
-}
-
-static void
-heap_xlog_confirm(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- page = BufferGetPage(buffer);
-
- offnum = xlrec->offnum;
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- /*
- * Confirm tuple as actually inserted
- */
- ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
- {
- RelFileLocator rlocator;
- Buffer vmbuffer = InvalidBuffer;
- BlockNumber block;
- Relation reln;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
- reln = CreateFakeRelcacheEntry(rlocator);
-
- visibilitymap_pin(reln, block, &vmbuffer);
- visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- page = (Page) BufferGetPage(buffer);
-
- offnum = xlrec->offnum;
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
- htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
- fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
- &htup->t_infomask2);
-
- /*
- * Clear relevant update flags, but only if the modified infomask says
- * there's no update.
- */
- if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
- {
- HeapTupleHeaderClearHotUpdated(htup);
- /* Make sure there is no forward chain link in t_ctid */
- ItemPointerSet(&htup->t_ctid,
- BufferGetBlockNumber(buffer),
- offnum);
- }
- HeapTupleHeaderSetXmax(htup, xlrec->xmax);
- HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_lock_updated(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_lock_updated *xlrec;
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
-
- xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
-
- /*
- * The visibility map may need to be fixed even if the heap page is
- * already up-to-date.
- */
- if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
- {
- RelFileLocator rlocator;
- Buffer vmbuffer = InvalidBuffer;
- BlockNumber block;
- Relation reln;
-
- XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
- reln = CreateFakeRelcacheEntry(rlocator);
-
- visibilitymap_pin(reln, block, &vmbuffer);
- visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
-
- ReleaseBuffer(vmbuffer);
- FreeFakeRelcacheEntry(reln);
- }
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- page = BufferGetPage(buffer);
-
- offnum = xlrec->offnum;
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
- htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
- fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
- &htup->t_infomask2);
- HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-static void
-heap_xlog_inplace(XLogReaderState *record)
-{
- XLogRecPtr lsn = record->EndRecPtr;
- xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
- Buffer buffer;
- Page page;
- OffsetNumber offnum;
- ItemId lp = NULL;
- HeapTupleHeader htup;
- uint32 oldlen;
- Size newlen;
-
- if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
- {
- char *newtup = XLogRecGetBlockData(record, 0, &newlen);
-
- page = BufferGetPage(buffer);
-
- offnum = xlrec->offnum;
- if (PageGetMaxOffsetNumber(page) >= offnum)
- lp = PageGetItemId(page, offnum);
-
- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
- elog(PANIC, "invalid lp");
-
- htup = (HeapTupleHeader) PageGetItem(page, lp);
-
- oldlen = ItemIdGetLength(lp) - htup->t_hoff;
- if (oldlen != newlen)
- elog(PANIC, "wrong tuple length");
-
- memcpy((char *) htup + htup->t_hoff, newtup, newlen);
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-}
-
-void
-heap_redo(XLogReaderState *record)
-{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- /*
- * These operations don't overwrite MVCC data so no conflict processing is
- * required. The ones in heap2 rmgr do.
- */
-
- switch (info & XLOG_HEAP_OPMASK)
- {
- case XLOG_HEAP_INSERT:
- heap_xlog_insert(record);
- break;
- case XLOG_HEAP_DELETE:
- heap_xlog_delete(record);
- break;
- case XLOG_HEAP_UPDATE:
- heap_xlog_update(record, false);
- break;
- case XLOG_HEAP_TRUNCATE:
-
- /*
- * TRUNCATE is a no-op because the actions are already logged as
- * SMGR WAL records. TRUNCATE WAL record only exists for logical
- * decoding.
- */
- break;
- case XLOG_HEAP_HOT_UPDATE:
- heap_xlog_update(record, true);
- break;
- case XLOG_HEAP_CONFIRM:
- heap_xlog_confirm(record);
- break;
- case XLOG_HEAP_LOCK:
- heap_xlog_lock(record);
- break;
- case XLOG_HEAP_INPLACE:
- heap_xlog_inplace(record);
- break;
- default:
- elog(PANIC, "heap_redo: unknown op code %u", info);
- }
-}
-
-void
-heap2_redo(XLogReaderState *record)
-{
- uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
-
- switch (info & XLOG_HEAP_OPMASK)
- {
- case XLOG_HEAP2_PRUNE_ON_ACCESS:
- case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
- case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
- heap_xlog_prune_freeze(record);
- break;
- case XLOG_HEAP2_VISIBLE:
- heap_xlog_visible(record);
- break;
- case XLOG_HEAP2_MULTI_INSERT:
- heap_xlog_multi_insert(record);
- break;
- case XLOG_HEAP2_LOCK_UPDATED:
- heap_xlog_lock_updated(record);
- break;
- case XLOG_HEAP2_NEW_CID:
-
- /*
- * Nothing to do on a real replay, only used during logical
- * decoding.
- */
- break;
- case XLOG_HEAP2_REWRITE:
- heap_xlog_logical_rewrite(record);
- break;
- default:
- elog(PANIC, "heap2_redo: unknown op code %u", info);
- }
-}
-
-/*
- * Mask a heap page before performing consistency checks on it.
- */
-void
-heap_mask(char *pagedata, BlockNumber blkno)
-{
- Page page = (Page) pagedata;
- OffsetNumber off;
-
- mask_page_lsn_and_checksum(page);
-
- mask_page_hint_bits(page);
- mask_unused_space(page);
-
- for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
- {
- ItemId iid = PageGetItemId(page, off);
- char *page_item;
-
- page_item = (char *) (page + ItemIdGetOffset(iid));
-
- if (ItemIdIsNormal(iid))
- {
- HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
-
- /*
- * If xmin of a tuple is not yet frozen, we should ignore
- * differences in hint bits, since they can be set without
- * emitting WAL.
- */
- if (!HeapTupleHeaderXminFrozen(page_htup))
- page_htup->t_infomask &= ~HEAP_XACT_MASK;
- else
- {
- /* Still we need to mask xmax hint bits. */
- page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
- page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
- }
-
- /*
- * During replay, we set Command Id to FirstCommandId. Hence, mask
- * it. See heap_xlog_insert() for details.
- */
- page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
-
- /*
- * For a speculative tuple, heap_insert() does not set ctid in the
- * caller-passed heap tuple itself, leaving the ctid field to
- * contain a speculative token value - a per-backend monotonically
- * increasing identifier. Besides, it does not WAL-log ctid under
- * any circumstances.
- *
- * During redo, heap_xlog_insert() sets t_ctid to current block
- * number and self offset number. It doesn't care about any
- * speculative insertions on the primary. Hence, we set t_ctid to
- * current block number and self offset number to ignore any
- * inconsistency.
- */
- if (HeapTupleHeaderIsSpeculative(page_htup))
- ItemPointerSet(&page_htup->t_ctid, blkno, off);
-
- /*
- * NB: Not ignoring ctid changes due to the tuple having moved
- * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
- * important information that needs to be in-sync between primary
- * and standby, and thus is WAL logged.
- */
- }
-
- /*
- * Ignore any padding bytes after the tuple, when the length of the
- * item is not MAXALIGNed.
- */
- if (ItemIdHasStorage(iid))
- {
- int len = ItemIdGetLength(iid);
- int padlen = MAXALIGN(len) - len;
-
- if (padlen > 0)
- memset(page_item + len, MASK_MARKER, padlen);
- }
- }
-}
-
/*
* HeapCheckForSerializableConflictOut
* We are reading a tuple. If it's not visible, there may be a
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * heapam_xlog.c
+ * WAL replay logic for heap access method.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/heap/heapam_xlog.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/bufmask.h"
+#include "access/heapam.h"
+#include "access/visibilitymap.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "storage/freespace.h"
+#include "storage/standby.h"
+
+
+/*
+ * Replay XLOG_HEAP2_PRUNE_* records.
+ */
+static void
+heap_xlog_prune_freeze(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ char *maindataptr = XLogRecGetData(record);
+ xl_heap_prune xlrec;
+ Buffer buffer;
+ RelFileLocator rlocator;
+ BlockNumber blkno;
+ XLogRedoAction action;
+
+ XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+ memcpy(&xlrec, maindataptr, SizeOfHeapPrune);
+ maindataptr += SizeOfHeapPrune;
+
+ /*
+ * We will take an ordinary exclusive lock or a cleanup lock depending on
+ * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive
+ * lock, we better not be doing anything that requires moving existing
+ * tuple data.
+ */
+ Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 ||
+ (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0);
+
+ /*
+ * We are about to remove and/or freeze tuples. In Hot Standby mode,
+ * ensure that there are no queries running for which the removed tuples
+ * are still visible or which still consider the frozen xids as running.
+ * The conflict horizon XID comes after xl_heap_prune.
+ */
+ if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0)
+ {
+ TransactionId snapshot_conflict_horizon;
+
+ /* memcpy() because snapshot_conflict_horizon is stored unaligned */
+ memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId));
+ maindataptr += sizeof(TransactionId);
+
+ if (InHotStandby)
+ ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon,
+ (xlrec.flags & XLHP_IS_CATALOG_REL) != 0,
+ rlocator);
+ }
+
+ /*
+ * If we have a full-page image, restore it and we're done.
+ */
+ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL,
+ (xlrec.flags & XLHP_CLEANUP_LOCK) != 0,
+ &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ Page page = (Page) BufferGetPage(buffer);
+ OffsetNumber *redirected;
+ OffsetNumber *nowdead;
+ OffsetNumber *nowunused;
+ int nredirected;
+ int ndead;
+ int nunused;
+ int nplans;
+ Size datalen;
+ xlhp_freeze_plan *plans;
+ OffsetNumber *frz_offsets;
+ char *dataptr = XLogRecGetBlockData(record, 0, &datalen);
+
+ heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
+ &nplans, &plans, &frz_offsets,
+ &nredirected, &redirected,
+ &ndead, &nowdead,
+ &nunused, &nowunused);
+
+ /*
+ * Update all line pointers per the record, and repair fragmentation
+ * if needed.
+ */
+ if (nredirected > 0 || ndead > 0 || nunused > 0)
+ heap_page_prune_execute(buffer,
+ (xlrec.flags & XLHP_CLEANUP_LOCK) == 0,
+ redirected, nredirected,
+ nowdead, ndead,
+ nowunused, nunused);
+
+ /* Freeze tuples */
+ for (int p = 0; p < nplans; p++)
+ {
+ HeapTupleFreeze frz;
+
+ /*
+ * Convert freeze plan representation from WAL record into
+ * per-tuple format used by heap_execute_freeze_tuple
+ */
+ frz.xmax = plans[p].xmax;
+ frz.t_infomask2 = plans[p].t_infomask2;
+ frz.t_infomask = plans[p].t_infomask;
+ frz.frzflags = plans[p].frzflags;
+ frz.offset = InvalidOffsetNumber; /* unused, but be tidy */
+
+ for (int i = 0; i < plans[p].ntuples; i++)
+ {
+ OffsetNumber offset = *(frz_offsets++);
+ ItemId lp;
+ HeapTupleHeader tuple;
+
+ lp = PageGetItemId(page, offset);
+ tuple = (HeapTupleHeader) PageGetItem(page, lp);
+ heap_execute_freeze_tuple(tuple, &frz);
+ }
+ }
+
+ /* There should be no more data */
+ Assert((char *) frz_offsets == dataptr + datalen);
+
+ /*
+ * Note: we don't worry about updating the page's prunability hints.
+ * At worst this will cause an extra prune cycle to occur soon.
+ */
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+
+ /*
+ * If we released any space or line pointers, update the free space map.
+ *
+ * Do this regardless of a full-page image being applied, since the FSM
+ * data is not in the page anyway.
+ */
+ if (BufferIsValid(buffer))
+ {
+ if (xlrec.flags & (XLHP_HAS_REDIRECTIONS |
+ XLHP_HAS_DEAD_ITEMS |
+ XLHP_HAS_NOW_UNUSED_ITEMS))
+ {
+ Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
+
+ UnlockReleaseBuffer(buffer);
+
+ XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+ }
+ else
+ UnlockReleaseBuffer(buffer);
+ }
+}
+
+/*
+ * Replay XLOG_HEAP2_VISIBLE records.
+ *
+ * The critical integrity requirement here is that we must never end up with
+ * a situation where the visibility map bit is set, and the page-level
+ * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent
+ * page modification would fail to clear the visibility map bit.
+ */
+static void
+heap_xlog_visible(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+ Buffer vmbuffer = InvalidBuffer;
+ Buffer buffer;
+ Page page;
+ RelFileLocator rlocator;
+ BlockNumber blkno;
+ XLogRedoAction action;
+
+ Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags);
+
+ XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno);
+
+ /*
+ * If there are any Hot Standby transactions running that have an xmin
+ * horizon old enough that this page isn't all-visible for them, they
+ * might incorrectly decide that an index-only scan can skip a heap fetch.
+ *
+ * NB: It might be better to throw some kind of "soft" conflict here that
+ * forces any index-only scan that is in flight to perform heap fetches,
+ * rather than killing the transaction outright.
+ */
+ if (InHotStandby)
+ ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
+ xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
+ rlocator);
+
+ /*
+ * Read the heap page, if it still exists. If the heap file has dropped or
+ * truncated later in recovery, we don't need to update the page, but we'd
+ * better still update the visibility map.
+ */
+ action = XLogReadBufferForRedo(record, 1, &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ /*
+ * We don't bump the LSN of the heap page when setting the visibility
+ * map bit (unless checksums or wal_hint_bits is enabled, in which
+ * case we must). This exposes us to torn page hazards, but since
+ * we're not inspecting the existing page contents in any way, we
+ * don't care.
+ */
+ page = BufferGetPage(buffer);
+
+ PageSetAllVisible(page);
+
+ if (XLogHintBitIsNeeded())
+ PageSetLSN(page, lsn);
+
+ MarkBufferDirty(buffer);
+ }
+ else if (action == BLK_RESTORED)
+ {
+ /*
+ * If heap block was backed up, we already restored it and there's
+ * nothing more to do. (This can only happen with checksums or
+ * wal_log_hints enabled.)
+ */
+ }
+
+ if (BufferIsValid(buffer))
+ {
+ Size space = PageGetFreeSpace(BufferGetPage(buffer));
+
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * Since FSM is not WAL-logged and only updated heuristically, it
+ * easily becomes stale in standbys. If the standby is later promoted
+ * and runs VACUUM, it will skip updating individual free space
+ * figures for pages that became all-visible (or all-frozen, depending
+ * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum
+ * propagates too optimistic free space values to upper FSM layers;
+ * later inserters try to use such pages only to find out that they
+ * are unusable. This can cause long stalls when there are many such
+ * pages.
+ *
+ * Forestall those problems by updating FSM's idea about a page that
+ * is becoming all-visible or all-frozen.
+ *
+ * Do this regardless of a full-page image being applied, since the
+ * FSM data is not in the page anyway.
+ */
+ if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
+ XLogRecordPageWithFreeSpace(rlocator, blkno, space);
+ }
+
+ /*
+ * Even if we skipped the heap page update due to the LSN interlock, it's
+ * still safe to update the visibility map. Any WAL record that clears
+ * the visibility map bit does so before checking the page LSN, so any
+ * bits that need to be cleared will still be cleared.
+ */
+ if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false,
+ &vmbuffer) == BLK_NEEDS_REDO)
+ {
+ Page vmpage = BufferGetPage(vmbuffer);
+ Relation reln;
+ uint8 vmbits;
+
+ /* initialize the page if it was read as zeros */
+ if (PageIsNew(vmpage))
+ PageInit(vmpage, BLCKSZ, 0);
+
+ /* remove VISIBILITYMAP_XLOG_* */
+ vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS;
+
+ /*
+ * XLogReadBufferForRedoExtended locked the buffer. But
+ * visibilitymap_set will handle locking itself.
+ */
+ LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK);
+
+ reln = CreateFakeRelcacheEntry(rlocator);
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+
+ visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
+ xlrec->snapshotConflictHorizon, vmbits);
+
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+ else if (BufferIsValid(vmbuffer))
+ UnlockReleaseBuffer(vmbuffer);
+}
+
+/*
+ * Given an "infobits" field from an XLog record, set the correct bits in the
+ * given infomask and infomask2 for the tuple touched by the record.
+ *
+ * (This is the reverse of compute_infobits).
+ */
+static void
+fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
+{
+ *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
+ HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
+ *infomask2 &= ~HEAP_KEYS_UPDATED;
+
+ if (infobits & XLHL_XMAX_IS_MULTI)
+ *infomask |= HEAP_XMAX_IS_MULTI;
+ if (infobits & XLHL_XMAX_LOCK_ONLY)
+ *infomask |= HEAP_XMAX_LOCK_ONLY;
+ if (infobits & XLHL_XMAX_EXCL_LOCK)
+ *infomask |= HEAP_XMAX_EXCL_LOCK;
+ /* note HEAP_XMAX_SHR_LOCK isn't considered here */
+ if (infobits & XLHL_XMAX_KEYSHR_LOCK)
+ *infomask |= HEAP_XMAX_KEYSHR_LOCK;
+
+ if (infobits & XLHL_KEYS_UPDATED)
+ *infomask2 |= HEAP_KEYS_UPDATED;
+}
+
+/*
+ * Replay XLOG_HEAP_DELETE records.
+ */
+static void
+heap_xlog_delete(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+ BlockNumber blkno;
+ RelFileLocator target_locator;
+ ItemPointerData target_tid;
+
+ XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+ {
+ Relation reln = CreateFakeRelcacheEntry(target_locator);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+ visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(buffer);
+
+ if (PageGetMaxOffsetNumber(page) >= xlrec->offnum)
+ lp = PageGetItemId(page, xlrec->offnum);
+
+ if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+ htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+ HeapTupleHeaderClearHotUpdated(htup);
+ fix_infomask_from_infobits(xlrec->infobits_set,
+ &htup->t_infomask, &htup->t_infomask2);
+ if (!(xlrec->flags & XLH_DELETE_IS_SUPER))
+ HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+ else
+ HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+
+ /* Mark the page as a candidate for pruning */
+ PageSetPrunable(page, XLogRecGetXid(record));
+
+ if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED)
+ PageClearAllVisible(page);
+
+ /* Make sure t_ctid is set correctly */
+ if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE)
+ HeapTupleHeaderSetMovedPartitions(htup);
+ else
+ htup->t_ctid = target_tid;
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_INSERT records.
+ */
+static void
+heap_xlog_insert(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ union
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ HeapTupleHeader htup;
+ xl_heap_header xlhdr;
+ uint32 newlen;
+ Size freespace = 0;
+ RelFileLocator target_locator;
+ BlockNumber blkno;
+ ItemPointerData target_tid;
+ XLogRedoAction action;
+
+ XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno);
+ ItemPointerSetBlockNumber(&target_tid, blkno);
+ ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum);
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+ {
+ Relation reln = CreateFakeRelcacheEntry(target_locator);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+ visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ /*
+ * If we inserted the first and only tuple on the page, re-initialize the
+ * page from scratch.
+ */
+ if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+ {
+ buffer = XLogInitBufferForRedo(record, 0);
+ page = BufferGetPage(buffer);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ action = XLogReadBufferForRedo(record, 0, &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ Size datalen;
+ char *data;
+
+ page = BufferGetPage(buffer);
+
+ if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum)
+ elog(PANIC, "invalid max offset number");
+
+ data = XLogRecGetBlockData(record, 0, &datalen);
+
+ newlen = datalen - SizeOfHeapHeader;
+ Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize);
+ memcpy((char *) &xlhdr, data, SizeOfHeapHeader);
+ data += SizeOfHeapHeader;
+
+ htup = &tbuf.hdr;
+ MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+ /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ memcpy((char *) htup + SizeofHeapTupleHeader,
+ data,
+ newlen);
+ newlen += SizeofHeapTupleHeader;
+ htup->t_infomask2 = xlhdr.t_infomask2;
+ htup->t_infomask = xlhdr.t_infomask;
+ htup->t_hoff = xlhdr.t_hoff;
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+ HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ htup->t_ctid = target_tid;
+
+ if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
+ true, true) == InvalidOffsetNumber)
+ elog(PANIC, "failed to add tuple");
+
+ freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+ PageSetLSN(page, lsn);
+
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+ PageClearAllVisible(page);
+
+ /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+ if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+ PageSetAllVisible(page);
+
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * If the page is running low on free space, update the FSM as well.
+ * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ * better than that without knowing the fill-factor for the table.
+ *
+ * XXX: Don't do this if the page was restored from full page image. We
+ * don't bother to update the FSM in that case, it doesn't need to be
+ * totally accurate anyway.
+ */
+ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+ XLogRecordPageWithFreeSpace(target_locator, blkno, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP2_MULTI_INSERT records.
+ */
+static void
+heap_xlog_multi_insert(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_multi_insert *xlrec;
+ RelFileLocator rlocator;
+ BlockNumber blkno;
+ Buffer buffer;
+ Page page;
+ union
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ HeapTupleHeader htup;
+ uint32 newlen;
+ Size freespace = 0;
+ int i;
+ bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0;
+ XLogRedoAction action;
+
+ /*
+ * Insertion doesn't overwrite MVCC data, so no conflict processing is
+ * required.
+ */
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(record);
+
+ XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno);
+
+ /* check that the mutually exclusive flags are not both set */
+ Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) &&
+ (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)));
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+ {
+ Relation reln = CreateFakeRelcacheEntry(rlocator);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, blkno, &vmbuffer);
+ visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (isinit)
+ {
+ buffer = XLogInitBufferForRedo(record, 0);
+ page = BufferGetPage(buffer);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ action = BLK_NEEDS_REDO;
+ }
+ else
+ action = XLogReadBufferForRedo(record, 0, &buffer);
+ if (action == BLK_NEEDS_REDO)
+ {
+ char *tupdata;
+ char *endptr;
+ Size len;
+
+ /* Tuples are stored as block data */
+ tupdata = XLogRecGetBlockData(record, 0, &len);
+ endptr = tupdata + len;
+
+ page = (Page) BufferGetPage(buffer);
+
+ for (i = 0; i < xlrec->ntuples; i++)
+ {
+ OffsetNumber offnum;
+ xl_multi_insert_tuple *xlhdr;
+
+ /*
+ * If we're reinitializing the page, the tuples are stored in
+ * order from FirstOffsetNumber. Otherwise there's an array of
+ * offsets in the WAL record, and the tuples come after that.
+ */
+ if (isinit)
+ offnum = FirstOffsetNumber + i;
+ else
+ offnum = xlrec->offsets[i];
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "invalid max offset number");
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata);
+ tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+
+ newlen = xlhdr->datalen;
+ Assert(newlen <= MaxHeapTupleSize);
+ htup = &tbuf.hdr;
+ MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+ /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+ memcpy((char *) htup + SizeofHeapTupleHeader,
+ (char *) tupdata,
+ newlen);
+ tupdata += newlen;
+
+ newlen += SizeofHeapTupleHeader;
+ htup->t_infomask2 = xlhdr->t_infomask2;
+ htup->t_infomask = xlhdr->t_infomask;
+ htup->t_hoff = xlhdr->t_hoff;
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+ HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+ ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "failed to add tuple");
+ }
+ if (tupdata != endptr)
+ elog(PANIC, "total tuple length mismatch");
+
+ freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+ PageSetLSN(page, lsn);
+
+ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED)
+ PageClearAllVisible(page);
+
+ /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */
+ if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET)
+ PageSetAllVisible(page);
+
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * If the page is running low on free space, update the FSM as well.
+ * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ * better than that without knowing the fill-factor for the table.
+ *
+ * XXX: Don't do this if the page was restored from full page image. We
+ * don't bother to update the FSM in that case, it doesn't need to be
+ * totally accurate anyway.
+ */
+ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
+ XLogRecordPageWithFreeSpace(rlocator, blkno, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE records.
+ */
+static void
+heap_xlog_update(XLogReaderState *record, bool hot_update)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
+ RelFileLocator rlocator;
+ BlockNumber oldblk;
+ BlockNumber newblk;
+ ItemPointerData newtid;
+ Buffer obuffer,
+ nbuffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleData oldtup;
+ HeapTupleHeader htup;
+ uint16 prefixlen = 0,
+ suffixlen = 0;
+ char *newp;
+ union
+ {
+ HeapTupleHeaderData hdr;
+ char data[MaxHeapTupleSize];
+ } tbuf;
+ xl_heap_header xlhdr;
+ uint32 newlen;
+ Size freespace = 0;
+ XLogRedoAction oldaction;
+ XLogRedoAction newaction;
+
+ /* initialize to keep the compiler quiet */
+ oldtup.t_data = NULL;
+ oldtup.t_len = 0;
+
+ XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk);
+ if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL))
+ {
+ /* HOT updates are never done across pages */
+ Assert(!hot_update);
+ }
+ else
+ oldblk = newblk;
+
+ ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+ {
+ Relation reln = CreateFakeRelcacheEntry(rlocator);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, oldblk, &vmbuffer);
+ visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ /*
+ * In normal operation, it is important to lock the two pages in
+ * page-number order, to avoid possible deadlocks against other update
+ * operations going the other way. However, during WAL replay there can
+ * be no other update happening, so we don't need to worry about that. But
+ * we *do* need to worry that we don't expose an inconsistent state to Hot
+ * Standby queries --- so the original page can't be unlocked before we've
+ * added the new tuple to the new page.
+ */
+
+ /* Deal with old tuple version */
+ oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
+ &obuffer);
+ if (oldaction == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(obuffer);
+ offnum = xlrec->old_offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ oldtup.t_data = htup;
+ oldtup.t_len = ItemIdGetLength(lp);
+
+ htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+ htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+ if (hot_update)
+ HeapTupleHeaderSetHotUpdated(htup);
+ else
+ HeapTupleHeaderClearHotUpdated(htup);
+ fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
+ &htup->t_infomask2);
+ HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+ /* Set forward chain link in t_ctid */
+ htup->t_ctid = newtid;
+
+ /* Mark the page as a candidate for pruning */
+ PageSetPrunable(page, XLogRecGetXid(record));
+
+ if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED)
+ PageClearAllVisible(page);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(obuffer);
+ }
+
+ /*
+ * Read the page the new tuple goes into, if different from old.
+ */
+ if (oldblk == newblk)
+ {
+ nbuffer = obuffer;
+ newaction = oldaction;
+ }
+ else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE)
+ {
+ nbuffer = XLogInitBufferForRedo(record, 0);
+ page = (Page) BufferGetPage(nbuffer);
+ PageInit(page, BufferGetPageSize(nbuffer), 0);
+ newaction = BLK_NEEDS_REDO;
+ }
+ else
+ newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+ {
+ Relation reln = CreateFakeRelcacheEntry(rlocator);
+ Buffer vmbuffer = InvalidBuffer;
+
+ visibilitymap_pin(reln, newblk, &vmbuffer);
+ visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS);
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ /* Deal with new tuple */
+ if (newaction == BLK_NEEDS_REDO)
+ {
+ char *recdata;
+ char *recdata_end;
+ Size datalen;
+ Size tuplen;
+
+ recdata = XLogRecGetBlockData(record, 0, &datalen);
+ recdata_end = recdata + datalen;
+
+ page = BufferGetPage(nbuffer);
+
+ offnum = xlrec->new_offnum;
+ if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+ elog(PANIC, "invalid max offset number");
+
+ if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD)
+ {
+ Assert(newblk == oldblk);
+ memcpy(&prefixlen, recdata, sizeof(uint16));
+ recdata += sizeof(uint16);
+ }
+ if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD)
+ {
+ Assert(newblk == oldblk);
+ memcpy(&suffixlen, recdata, sizeof(uint16));
+ recdata += sizeof(uint16);
+ }
+
+ memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader);
+ recdata += SizeOfHeapHeader;
+
+ tuplen = recdata_end - recdata;
+ Assert(tuplen <= MaxHeapTupleSize);
+
+ htup = &tbuf.hdr;
+ MemSet((char *) htup, 0, SizeofHeapTupleHeader);
+
+ /*
+ * Reconstruct the new tuple using the prefix and/or suffix from the
+ * old tuple, and the data stored in the WAL record.
+ */
+ newp = (char *) htup + SizeofHeapTupleHeader;
+ if (prefixlen > 0)
+ {
+ int len;
+
+ /* copy bitmap [+ padding] [+ oid] from WAL record */
+ len = xlhdr.t_hoff - SizeofHeapTupleHeader;
+ memcpy(newp, recdata, len);
+ recdata += len;
+ newp += len;
+
+ /* copy prefix from old tuple */
+ memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+ newp += prefixlen;
+
+ /* copy new tuple data from WAL record */
+ len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader);
+ memcpy(newp, recdata, len);
+ recdata += len;
+ newp += len;
+ }
+ else
+ {
+ /*
+ * copy bitmap [+ padding] [+ oid] + data from record, all in one
+ * go
+ */
+ memcpy(newp, recdata, tuplen);
+ recdata += tuplen;
+ newp += tuplen;
+ }
+ Assert(recdata == recdata_end);
+
+ /* copy suffix from old tuple */
+ if (suffixlen > 0)
+ memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
+
+ newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen;
+ htup->t_infomask2 = xlhdr.t_infomask2;
+ htup->t_infomask = xlhdr.t_infomask;
+ htup->t_hoff = xlhdr.t_hoff;
+
+ HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
+ HeapTupleHeaderSetCmin(htup, FirstCommandId);
+ HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
+ /* Make sure there is no forward chain link in t_ctid */
+ htup->t_ctid = newtid;
+
+ offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+ if (offnum == InvalidOffsetNumber)
+ elog(PANIC, "failed to add tuple");
+
+ if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED)
+ PageClearAllVisible(page);
+
+ freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(nbuffer);
+ }
+
+ if (BufferIsValid(nbuffer) && nbuffer != obuffer)
+ UnlockReleaseBuffer(nbuffer);
+ if (BufferIsValid(obuffer))
+ UnlockReleaseBuffer(obuffer);
+
+ /*
+ * If the new page is running low on free space, update the FSM as well.
+ * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+ * better than that without knowing the fill-factor for the table.
+ *
+ * However, don't update the FSM on HOT updates, because after crash
+ * recovery, either the old or the new tuple will certainly be dead and
+ * prunable. After pruning, the page will have roughly as much free space
+ * as it did before the update, assuming the new tuple is about the same
+ * size as the old one.
+ *
+ * XXX: Don't do this if the page was restored from full page image. We
+ * don't bother to update the FSM in that case, it doesn't need to be
+ * totally accurate anyway.
+ */
+ if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
+ XLogRecordPageWithFreeSpace(rlocator, newblk, freespace);
+}
+
+/*
+ * Replay XLOG_HEAP_CONFIRM records.
+ */
+static void
+heap_xlog_confirm(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(buffer);
+
+ offnum = xlrec->offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /*
+ * Confirm tuple as actually inserted
+ */
+ ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_LOCK records.
+ */
+static void
+heap_xlog_lock(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+ {
+ RelFileLocator rlocator;
+ Buffer vmbuffer = InvalidBuffer;
+ BlockNumber block;
+ Relation reln;
+
+ XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+ reln = CreateFakeRelcacheEntry(rlocator);
+
+ visibilitymap_pin(reln, block, &vmbuffer);
+ visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ page = (Page) BufferGetPage(buffer);
+
+ offnum = xlrec->offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+ htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+ fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+ &htup->t_infomask2);
+
+ /*
+ * Clear relevant update flags, but only if the modified infomask says
+ * there's no update.
+ */
+ if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
+ {
+ HeapTupleHeaderClearHotUpdated(htup);
+ /* Make sure there is no forward chain link in t_ctid */
+ ItemPointerSet(&htup->t_ctid,
+ BufferGetBlockNumber(buffer),
+ offnum);
+ }
+ HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP2_LOCK_UPDATED records.
+ */
+static void
+heap_xlog_lock_updated(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_lock_updated *xlrec;
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ xlrec = (xl_heap_lock_updated *) XLogRecGetData(record);
+
+ /*
+ * The visibility map may need to be fixed even if the heap page is
+ * already up-to-date.
+ */
+ if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED)
+ {
+ RelFileLocator rlocator;
+ Buffer vmbuffer = InvalidBuffer;
+ BlockNumber block;
+ Relation reln;
+
+ XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block);
+ reln = CreateFakeRelcacheEntry(rlocator);
+
+ visibilitymap_pin(reln, block, &vmbuffer);
+ visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN);
+
+ ReleaseBuffer(vmbuffer);
+ FreeFakeRelcacheEntry(reln);
+ }
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ page = BufferGetPage(buffer);
+
+ offnum = xlrec->offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+ htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+ fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+ &htup->t_infomask2);
+ HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+/*
+ * Replay XLOG_HEAP_INPLACE records.
+ */
+static void
+heap_xlog_inplace(XLogReaderState *record)
+{
+ XLogRecPtr lsn = record->EndRecPtr;
+ xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record);
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+ uint32 oldlen;
+ Size newlen;
+
+ if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
+ {
+ char *newtup = XLogRecGetBlockData(record, 0, &newlen);
+
+ page = BufferGetPage(buffer);
+
+ offnum = xlrec->offnum;
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+ elog(PANIC, "invalid lp");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+ if (oldlen != newlen)
+ elog(PANIC, "wrong tuple length");
+
+ memcpy((char *) htup + htup->t_hoff, newtup, newlen);
+
+ PageSetLSN(page, lsn);
+ MarkBufferDirty(buffer);
+ }
+ if (BufferIsValid(buffer))
+ UnlockReleaseBuffer(buffer);
+}
+
+void
+heap_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ /*
+ * These operations don't overwrite MVCC data so no conflict processing is
+ * required. The ones in heap2 rmgr do.
+ */
+
+ switch (info & XLOG_HEAP_OPMASK)
+ {
+ case XLOG_HEAP_INSERT:
+ heap_xlog_insert(record);
+ break;
+ case XLOG_HEAP_DELETE:
+ heap_xlog_delete(record);
+ break;
+ case XLOG_HEAP_UPDATE:
+ heap_xlog_update(record, false);
+ break;
+ case XLOG_HEAP_TRUNCATE:
+
+ /*
+ * TRUNCATE is a no-op because the actions are already logged as
+ * SMGR WAL records. TRUNCATE WAL record only exists for logical
+ * decoding.
+ */
+ break;
+ case XLOG_HEAP_HOT_UPDATE:
+ heap_xlog_update(record, true);
+ break;
+ case XLOG_HEAP_CONFIRM:
+ heap_xlog_confirm(record);
+ break;
+ case XLOG_HEAP_LOCK:
+ heap_xlog_lock(record);
+ break;
+ case XLOG_HEAP_INPLACE:
+ heap_xlog_inplace(record);
+ break;
+ default:
+ elog(PANIC, "heap_redo: unknown op code %u", info);
+ }
+}
+
+void
+heap2_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info & XLOG_HEAP_OPMASK)
+ {
+ case XLOG_HEAP2_PRUNE_ON_ACCESS:
+ case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
+ case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
+ heap_xlog_prune_freeze(record);
+ break;
+ case XLOG_HEAP2_VISIBLE:
+ heap_xlog_visible(record);
+ break;
+ case XLOG_HEAP2_MULTI_INSERT:
+ heap_xlog_multi_insert(record);
+ break;
+ case XLOG_HEAP2_LOCK_UPDATED:
+ heap_xlog_lock_updated(record);
+ break;
+ case XLOG_HEAP2_NEW_CID:
+
+ /*
+ * Nothing to do on a real replay, only used during logical
+ * decoding.
+ */
+ break;
+ case XLOG_HEAP2_REWRITE:
+ heap_xlog_logical_rewrite(record);
+ break;
+ default:
+ elog(PANIC, "heap2_redo: unknown op code %u", info);
+ }
+}
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *pagedata, BlockNumber blkno)
+{
+ Page page = (Page) pagedata;
+ OffsetNumber off;
+
+ mask_page_lsn_and_checksum(page);
+
+ mask_page_hint_bits(page);
+ mask_unused_space(page);
+
+ for (off = 1; off <= PageGetMaxOffsetNumber(page); off++)
+ {
+ ItemId iid = PageGetItemId(page, off);
+ char *page_item;
+
+ page_item = (char *) (page + ItemIdGetOffset(iid));
+
+ if (ItemIdIsNormal(iid))
+ {
+ HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+ /*
+ * If xmin of a tuple is not yet frozen, we should ignore
+ * differences in hint bits, since they can be set without
+ * emitting WAL.
+ */
+ if (!HeapTupleHeaderXminFrozen(page_htup))
+ page_htup->t_infomask &= ~HEAP_XACT_MASK;
+ else
+ {
+ /* Still we need to mask xmax hint bits. */
+ page_htup->t_infomask &= ~HEAP_XMAX_INVALID;
+ page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED;
+ }
+
+ /*
+ * During replay, we set Command Id to FirstCommandId. Hence, mask
+ * it. See heap_xlog_insert() for details.
+ */
+ page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER;
+
+ /*
+ * For a speculative tuple, heap_insert() does not set ctid in the
+ * caller-passed heap tuple itself, leaving the ctid field to
+ * contain a speculative token value - a per-backend monotonically
+ * increasing identifier. Besides, it does not WAL-log ctid under
+ * any circumstances.
+ *
+ * During redo, heap_xlog_insert() sets t_ctid to current block
+ * number and self offset number. It doesn't care about any
+ * speculative insertions on the primary. Hence, we set t_ctid to
+ * current block number and self offset number to ignore any
+ * inconsistency.
+ */
+ if (HeapTupleHeaderIsSpeculative(page_htup))
+ ItemPointerSet(&page_htup->t_ctid, blkno, off);
+
+ /*
+ * NB: Not ignoring ctid changes due to the tuple having moved
+ * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's
+ * important information that needs to be in-sync between primary
+ * and standby, and thus is WAL logged.
+ */
+ }
+
+ /*
+ * Ignore any padding bytes after the tuple, when the length of the
+ * item is not MAXALIGNed.
+ */
+ if (ItemIdHasStorage(iid))
+ {
+ int len = ItemIdGetLength(iid);
+ int padlen = MAXALIGN(len) - len;
+
+ if (padlen > 0)
+ memset(page_item + len, MASK_MARKER, padlen);
+ }
+ }
+}