diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 2 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 26 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 1 | ||||
-rw-r--r-- | src/include/replication/slot.h | 7 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 4 |
7 files changed, 44 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 657cb4af1e3..5f596135b15 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + SnapBuildInitialConsistentPoint(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } @@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, abort_time, origin_id, origin_lsn, + InvalidXLogRecPtr, parsed->twophase_gid, false); } else diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index baeb45ff43c..3f6d723d096 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot); + need_full_snapshot, slot->data.initial_consistent_point); ctx->reorder->private_data = ctx; @@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; + slot->data.initial_consistent_point = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c3b963211e8..91600ac5667 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, /* * It is possible that this transaction is not decoded at prepare time * either because by that time we didn't have a consistent snapshot or it - * was decoded earlier but we have restarted. We can't distinguish between - * those two cases so we send the prepare in both the cases and let - * downstream decide whether to process or skip it. We don't need to - * decode the xact for aborts if it is not done already. + * was decoded earlier but we have restarted. We only need to send the + * prepare if it was not decoded earlier. We don't need to decode the xact + * for aborts if it is not done already. */ - if (!rbtxn_prepared(txn) && is_commit) + if ((txn->final_lsn < initial_consistent_point) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e11788795f1..ed3acadab7b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,6 +165,17 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* + * LSN at which we found a consistent point at the time of slot creation. + * This is also the point where we have exported a snapshot for the + * initial copy. + * + * The prepared transactions that are not covered by initial snapshot + * needs to be sent later along with commit prepared and they must be + * before this point. + */ + XLogRecPtr initial_consistent_point; + + /* * Don't start decoding WAL until the "xl_running_xacts" information * indicates there are no running xids with an xid smaller than this. */ @@ -269,7 +280,8 @@ SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, - bool need_full_snapshot) + bool need_full_snapshot, + XLogRecPtr initial_consistent_point) { MemoryContext context; MemoryContext oldcontext; @@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; + builder->initial_consistent_point = initial_consistent_point; MemoryContextSwitchTo(oldcontext); @@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder) } /* + * Return the LSN at which the snapshot was exported + */ +XLogRecPtr +SnapBuildInitialConsistentPoint(SnapBuild *builder) +{ + return builder->initial_consistent_point; +} + +/* * Should the contents of transaction ending at 'ptr' be decoded? */ bool @@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 3 +#define SNAPBUILD_VERSION 4 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bab31bf7af7..565a961d6ab 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 38a9a0b3fc4..5c3fde20c69 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData */ XLogRecPtr confirmed_flush; + /* + * LSN at which we found a consistent point at the time of slot creation. + * This is also the point where we have exported a snapshot for the + * initial copy. + */ + XLogRecPtr initial_consistent_point; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index d9f187a58ec..fbabce6764d 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, TransactionId xmin_horizon, XLogRecPtr start_lsn, - bool need_full_snapshot); + bool need_full_snapshot, + XLogRecPtr initial_consistent_point); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); @@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); +extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, |