diff options
author | Amit Kapila | 2020-12-30 10:47:26 +0000 |
---|---|---|
committer | Amit Kapila | 2020-12-30 10:47:26 +0000 |
commit | 0aa8a01d04c8fe200b7a106878eebc3d0af9105c (patch) | |
tree | 79fe885496f4d3493ae327156c0baf1aa0e1e43a /src/include | |
parent | fa744697c79189a661f802d9a979d959b4454df0 (diff) |
Extend the output plugin API to allow decoding of prepared xacts.
This adds six methods to the output plugin API, adding support for
streaming changes of two-phase transactions at prepare time.
* begin_prepare
* filter_prepare
* prepare
* commit_prepared
* rollback_prepared
* stream_prepare
Most of this is a simple extension of the existing methods, with the
semantic difference that the transaction is not yet committed and maybe
aborted later.
Until now two-phase transactions were translated into regular transactions
on the subscriber, and the GID was not forwarded to it. None of the
two-phase commands were communicated to the subscriber.
This patch provides the infrastructure for logical decoding plugins to be
informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED
and ROLLBACK PREPARED commands with the corresponding GID.
This also extends the 'test_decoding' plugin, implementing these new
methods.
This commit simply adds these new APIs and the upcoming patch to "allow
the decoding at prepare time in ReorderBuffer" will use these APIs.
Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'src/include')
-rw-r--r-- | src/include/replication/logical.h | 6 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 56 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 41 |
3 files changed, 103 insertions, 0 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 40bab7ee02d..28c9c1f474e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext bool streaming; /* + * Does the output plugin support two-phase decoding, and is it enabled? + */ + bool twophase; + + /* * State for writing output. */ bool accept_writes; @@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index b78c796450a..89e1dc3517d 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -100,6 +100,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); /* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + const char *gid); + +/* + * Callback called for every BEGIN of a prepared trnsaction. + */ +typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + + +/* * Called when starting to stream a block of changes from in-progress * transaction (may be called repeatedly, if it's streamed in multiple * chunks). @@ -124,6 +163,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* * Called to apply changes streamed to remote node from in-progress * transaction. */ @@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + + /* streaming of changes at prepare time */ + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodeBeginPrepareCB begin_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; + /* streaming of changes */ LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bd9dd7ec676..1e60afe70f4 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -245,6 +245,12 @@ typedef struct ReorderBufferTXN TransactionId toplevel_xid; /* + * Global transaction id required for identification of prepared + * transactions. + */ + char *gid; + + /* * LSN of the first data carrying, WAL record with knowledge about this * xid. This is allowed to *not* be first record adorned with this xid, if * the previous records aren't relevant for logical decoding. @@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* begin prepare callback signature */ +typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* rollback prepared callback signature */ +typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + /* start streaming transaction callback signature */ typedef void (*ReorderBufferStreamStartCB) ( ReorderBuffer *rb, @@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + /* commit streamed transaction callback signature */ typedef void (*ReorderBufferStreamCommitCB) ( ReorderBuffer *rb, @@ -505,11 +537,20 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callbacks to be called when streaming a transaction at prepare time. + */ + ReorderBufferBeginCB begin_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferRollbackPreparedCB rollback_prepared; + + /* * Callbacks to be called when streaming a transaction. */ ReorderBufferStreamStartCB stream_start; ReorderBufferStreamStopCB stream_stop; ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; |