summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c2
-rw-r--r--src/backend/replication/logical/logical.c3
-rw-r--r--src/backend/replication/logical/reorderbuffer.c10
-rw-r--r--src/backend/replication/logical/snapbuild.c26
-rw-r--r--src/include/replication/reorderbuffer.h1
-rw-r--r--src/include/replication/slot.h7
-rw-r--r--src/include/replication/snapbuild.h4
7 files changed, 44 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 657cb4af1e3..5f596135b15 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
abort_time, origin_id, origin_lsn,
+ InvalidXLogRecPtr,
parsed->twophase_gid, false);
}
else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45ff43c..3f6d723d096 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot);
+ need_full_snapshot, slot->data.initial_consistent_point);
ctx->reorder->private_data = ctx;
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b963211e8..91600ac5667 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We can't distinguish between
- * those two cases so we send the prepare in both the cases and let
- * downstream decide whether to process or skip it. We don't need to
- * decode the xact for aborts if it is not done already.
+ * was decoded earlier but we have restarted. We only need to send the
+ * prepare if it was not decoded earlier. We don't need to decode the xact
+ * for aborts if it is not done already.
*/
- if (!rbtxn_prepared(txn) && is_commit)
+ if ((txn->final_lsn < initial_consistent_point) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e11788795f1..ed3acadab7b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,17 @@ struct SnapBuild
XLogRecPtr start_decoding_at;
/*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ *
+ * The prepared transactions that are not covered by initial snapshot
+ * needs to be sent later along with commit prepared and they must be
+ * before this point.
+ */
+ XLogRecPtr initial_consistent_point;
+
+ /*
* Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with an xid smaller than this.
*/
@@ -269,7 +280,8 @@ SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
- bool need_full_snapshot)
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point)
{
MemoryContext context;
MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
+ builder->initial_consistent_point = initial_consistent_point;
MemoryContextSwitchTo(oldcontext);
@@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder)
}
/*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+ return builder->initial_consistent_point;
+}
+
+/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf7af7..565a961d6ab 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b3fc4..5c3fde20c69 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
*/
XLogRecPtr confirmed_flush;
+ /*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ */
+ XLogRecPtr initial_consistent_point;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a58ec..fbabce6764d 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
- bool need_full_snapshot);
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point);
extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,