summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c38
1 files changed, 25 insertions, 13 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 297eb11b5a8..9c2f1f17c7f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -362,20 +362,24 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
- if (!ctx->fast_forward &&
- SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
+ if (!ctx->fast_forward)
{
xl_heap_new_cid *xlrec;
@@ -422,16 +426,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding data changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeInsert(ctx, buf);
break;
@@ -442,17 +450,20 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeTruncate(ctx, buf);
break;
@@ -480,7 +491,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_CONFIRM:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeSpecConfirm(ctx, buf);
break;