summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/replication/logical.h6
-rw-r--r--src/include/replication/output_plugin.h56
-rw-r--r--src/include/replication/reorderbuffer.h41
3 files changed, 103 insertions, 0 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 40bab7ee02d..28c9c1f474e 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext
bool streaming;
/*
+ * Does the output plugin support two-phase decoding, and is it enabled?
+ */
+ bool twophase;
+
+ /*
* State for writing output.
*/
bool accept_writes;
@@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796450a..89e1dc3517d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -100,6 +100,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+
+/*
* Called when starting to stream a block of changes from in-progress
* transaction (may be called repeatedly, if it's streamed in multiple
* chunks).
@@ -124,6 +163,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn);
/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
* Called to apply changes streamed to remote node from in-progress
* transaction.
*/
@@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
+
+ /* streaming of changes at prepare time */
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
/* streaming of changes */
LogicalDecodeStreamStartCB stream_start_cb;
LogicalDecodeStreamStopCB stream_stop_cb;
LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bd9dd7ec676..1e60afe70f4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -245,6 +245,12 @@ typedef struct ReorderBufferTXN
TransactionId toplevel_xid;
/*
+ * Global transaction id required for identification of prepared
+ * transactions.
+ */
+ char *gid;
+
+ /*
* LSN of the first data carrying, WAL record with knowledge about this
* xid. This is allowed to *not* be first record adorned with this xid, if
* the previous records aren't relevant for logical decoding.
@@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
const char *prefix, Size sz,
const char *message);
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* rollback prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
/* start streaming transaction callback signature */
typedef void (*ReorderBufferStreamStartCB) (
ReorderBuffer *rb,
@@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
/* commit streamed transaction callback signature */
typedef void (*ReorderBufferStreamCommitCB) (
ReorderBuffer *rb,
@@ -505,11 +537,20 @@ struct ReorderBuffer
ReorderBufferMessageCB message;
/*
+ * Callbacks to be called when streaming a transaction at prepare time.
+ */
+ ReorderBufferBeginCB begin_prepare;
+ ReorderBufferPrepareCB prepare;
+ ReorderBufferCommitPreparedCB commit_prepared;
+ ReorderBufferRollbackPreparedCB rollback_prepared;
+
+ /*
* Callbacks to be called when streaming a transaction.
*/
ReorderBufferStreamStartCB stream_start;
ReorderBufferStreamStopCB stream_stop;
ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamPrepareCB stream_prepare;
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;