Extend the output plugin API to allow decoding of prepared xacts.
authorAmit Kapila <akapila@postgresql.org>
Wed, 30 Dec 2020 10:47:26 +0000 (16:17 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 30 Dec 2020 10:47:26 +0000 (16:17 +0530)
This adds six methods to the output plugin API, adding support for
streaming changes of two-phase transactions at prepare time.

* begin_prepare
* filter_prepare
* prepare
* commit_prepared
* rollback_prepared
* stream_prepare

Most of this is a simple extension of the existing methods, with the
semantic difference that the transaction is not yet committed and maybe
aborted later.

Until now two-phase transactions were translated into regular transactions
on the subscriber, and the GID was not forwarded to it. None of the
two-phase commands were communicated to the subscriber.

This patch provides the infrastructure for logical decoding plugins to be
informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED
and ROLLBACK PREPARED commands with the corresponding GID.

This also extends the 'test_decoding' plugin, implementing these new
methods.

This commit simply adds these new APIs and the upcoming patch to "allow
the decoding at prepare time in ReorderBuffer" will use these APIs.

Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com

contrib/test_decoding/test_decoding.c
doc/src/sgml/logicaldecoding.sgml
src/backend/replication/logical/logical.c
src/include/replication/logical.h
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h
src/tools/pgindent/typedefs.list

index e12278beb581702d31845e6d788a3c941f032c21..05763553a40eecb80ea3858e3b73379911168457 100644 (file)
@@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
                                                          ReorderBufferTXN *txn, XLogRecPtr message_lsn,
                                                          bool transactional, const char *prefix,
                                                          Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+                                                                        const char *gid);
+static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
+                                                                               ReorderBufferTXN *txn);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+                                                                 ReorderBufferTXN *txn,
+                                                                 XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+                                                                                 ReorderBufferTXN *txn,
+                                                                                 XLogRecPtr commit_lsn);
+static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                                                                                       ReorderBufferTXN *txn,
+                                                                                       XLogRecPtr prepare_end_lsn,
+                                                                                       TimestampTz prepare_time);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
                                                                   ReorderBufferTXN *txn);
 static void pg_output_stream_start(LogicalDecodingContext *ctx,
@@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
                                                                   ReorderBufferTXN *txn,
                                                                   XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+                                                                        ReorderBufferTXN *txn,
+                                                                        XLogRecPtr prepare_lsn);
 static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                                                        ReorderBufferTXN *txn,
                                                                        XLogRecPtr commit_lsn);
@@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->filter_by_origin_cb = pg_decode_filter;
        cb->shutdown_cb = pg_decode_shutdown;
        cb->message_cb = pg_decode_message;
+       cb->filter_prepare_cb = pg_decode_filter_prepare;
+       cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
+       cb->prepare_cb = pg_decode_prepare_txn;
+       cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+       cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
        cb->stream_start_cb = pg_decode_stream_start;
        cb->stream_stop_cb = pg_decode_stream_stop;
        cb->stream_abort_cb = pg_decode_stream_abort;
+       cb->stream_prepare_cb = pg_decode_stream_prepare;
        cb->stream_commit_cb = pg_decode_stream_commit;
        cb->stream_change_cb = pg_decode_stream_change;
        cb->stream_message_cb = pg_decode_stream_message;
@@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        ListCell   *option;
        TestDecodingData *data;
        bool            enable_streaming = false;
+       bool            enable_twophase = false;
 
        data = palloc0(sizeof(TestDecodingData));
        data->context = AllocSetContextCreate(ctx->context,
@@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                                 errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                                strVal(elem->arg), elem->defname)));
                }
+               else if (strcmp(elem->defname, "two-phase-commit") == 0)
+               {
+                       if (elem->arg == NULL)
+                               continue;
+                       else if (!parse_bool(strVal(elem->arg), &enable_twophase))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("could not parse value \"%s\" for parameter \"%s\"",
+                                                               strVal(elem->arg), elem->defname)));
+               }
                else
                {
                        ereport(ERROR,
@@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        }
 
        ctx->streaming &= enable_streaming;
+       ctx->twophase &= enable_twophase;
 }
 
 /* cleanup this plugin's resources */
@@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+/* BEGIN PREPARE callback */
+static void
+pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+       TestDecodingData *data = ctx->output_plugin_private;
+       TestDecodingTxnData *txndata =
+       MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+       txndata->xact_wrote_changes = false;
+       txn->output_plugin_private = txndata;
+
+       if (data->skip_empty_xacts)
+               return;
+
+       pg_output_begin(ctx, data, txn, true);
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                         XLogRecPtr prepare_lsn)
+{
+       TestDecodingData *data = ctx->output_plugin_private;
+       TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+       if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+               return;
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+                                        quote_literal_cstr(txn->gid));
+
+       if (data->include_xids)
+               appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+       if (data->include_timestamp)
+               appendStringInfo(ctx->out, " (at %s)",
+                                                timestamptz_to_str(txn->commit_time));
+
+       OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                                         XLogRecPtr commit_lsn)
+{
+       TestDecodingData *data = ctx->output_plugin_private;
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+                                        quote_literal_cstr(txn->gid));
+
+       if (data->include_xids)
+               appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+       if (data->include_timestamp)
+               appendStringInfo(ctx->out, " (at %s)",
+                                                timestamptz_to_str(txn->commit_time));
+
+       OutputPluginWrite(ctx, true);
+}
+
+/* ROLLBACK PREPARED callback */
+static void
+pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                                                               ReorderBufferTXN *txn,
+                                                               XLogRecPtr prepare_end_lsn,
+                                                               TimestampTz prepare_time)
+{
+       TestDecodingData *data = ctx->output_plugin_private;
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+                                        quote_literal_cstr(txn->gid));
+
+       if (data->include_xids)
+               appendStringInfo(ctx->out, ", txid %u", txn->xid);
+
+       if (data->include_timestamp)
+               appendStringInfo(ctx->out, " (at %s)",
+                                                timestamptz_to_str(txn->commit_time));
+
+       OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here we demonstrate a
+ * simple logic by checking the GID. If the GID contains the "_nodecode"
+ * substring, then we filter it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+{
+       if (strstr(gid, "_nodecode") != NULL)
+               return true;
+
+       return false;
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
                                 RepOriginId origin_id)
@@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
        OutputPluginWrite(ctx, true);
 }
 
+static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+                                                ReorderBufferTXN *txn,
+                                                XLogRecPtr prepare_lsn)
+{
+       TestDecodingData *data = ctx->output_plugin_private;
+       TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+       if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
+               return;
+
+       OutputPluginPrepareWrite(ctx, true);
+
+       if (data->include_xids)
+               appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
+                                                quote_literal_cstr(txn->gid), txn->xid);
+       else
+               appendStringInfo(ctx->out, "preparing streamed transaction %s",
+                                                quote_literal_cstr(txn->gid));
+
+       if (data->include_timestamp)
+               appendStringInfo(ctx->out, " (at %s)",
+                                                timestamptz_to_str(txn->commit_time));
+
+       OutputPluginWrite(ctx, true);
+}
+
 static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                                ReorderBufferTXN *txn,
index ca78a81e9c545a42e0d375ba55bca8fdc5ee18a2..d63f90ff282b8a67b0b672afbe40e9b68ad68d0d 100644 (file)
@@ -389,9 +389,15 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodeBeginPrepareCB begin_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
     LogicalDecodeStreamStopCB stream_stop_cb;
     LogicalDecodeStreamAbortCB stream_abort_cb;
+    LogicalDecodeStreamPrepareCB stream_prepare_cb;
     LogicalDecodeStreamCommitCB stream_commit_cb;
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
@@ -413,10 +419,20 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      An output plugin may also define functions to support streaming of large,
      in-progress transactions. The <function>stream_start_cb</function>,
      <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
-     <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+     <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
+     and <function>stream_prepare_cb</function>
      are required, while <function>stream_message_cb</function> and
      <function>stream_truncate_cb</function> are optional.
     </para>
+
+    <para>
+    An output plugin may also define functions to support two-phase commits,
+    which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
+    The <function>begin_prepare_cb</function>, <function>prepare_cb</function>, 
+    <function>stream_prepare_cb</function>,
+    <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
+    callbacks are required, while <function>filter_prepare_cb</function> is optional.
+    </para>
    </sect2>
 
    <sect2 id="logicaldecoding-capabilities">
@@ -477,7 +493,15 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using <command>PREPARE TRANSACTION</command> will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+     command. In that case, the logical decoding of this transaction will
+     be aborted too. We will skip all the changes of such a transaction once
+     the abort is detected and abort the transaction when we read WAL for
+     <command>ROLLBACK PREPARED</command>.
     </para>
 
     <note>
@@ -587,7 +611,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an <command>INSERT</command>, <command>UPDATE</command>,
       or <command>DELETE</command>. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The <function>change_cb</function> callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 <programlisting>
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -685,7 +715,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The <parameter>lsn</parameter> has WAL
       location of the message. The <parameter>transactional</parameter> says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the <parameter>message</parameter> parameter holds
@@ -698,6 +734,111 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+     <title>Prepare Filter Callback</title>
+
+     <para>
+       The optional <function>filter_prepare_cb</function> callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       <command>COMMIT PREPARED</command> time later. To signal that
+       decoding should be skipped, return <literal>true</literal>;
+       <literal>false</literal> otherwise. When the callback is not
+       defined, <literal>false</literal> is assumed (i.e. nothing is
+       filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              const char *gid);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents as for the
+      other callbacks. The <parameter>gid</parameter> is the identifier that later
+      identifies this transaction for <command>COMMIT PREPARED</command> or
+      <command>ROLLBACK PREPARED</command>.
+     </para>
+     <para>
+      The callback has to provide the same static answer for a given
+      <parameter>gid</parameter> every time it is called.
+     </para>
+     </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-begin-prepare">
+     <title>Transaction Begin Prepare Callback</title>
+
+     <para>
+      The required <function>begin_prepare_cb</function> callback is called
+      whenever the start of a prepared transaction has been decoded. The
+      <parameter>gid</parameter> field, which is part of the
+      <parameter>txn</parameter> parameter can be used in this callback to
+      check if the plugin has already received this prepare in which case it
+      can skip the remaining changes of the transaction. This can only happen
+      if the user restarts the decoding after receiving the prepare for a
+      transaction but before receiving the commit prepared say because of some
+      error.
+      <programlisting>
+       typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+                                                    ReorderBufferTXN *txn);
+      </programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-prepare">
+     <title>Transaction Prepare Callback</title>
+
+     <para>
+      The required <function>prepare_cb</function> callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The <function>change_cb</function> callback for all modified
+      rows will have been called before this, if there have been any modified
+      rows. The <parameter>gid</parameter> field, which is part of the
+      <parameter>txn</parameter> parameter can be used in this callback.
+      <programlisting>
+       typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr prepare_lsn);
+      </programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+     <title>Transaction Commit Prepared Callback</title>
+
+     <para>
+      The required <function>commit_prepared_cb</function> callback is called
+      whenever a transaction commit prepared has been decoded. The
+      <parameter>gid</parameter> field, which is part of the
+      <parameter>txn</parameter> parameter can be used in this callback.
+      <programlisting>
+       typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                      ReorderBufferTXN *txn,
+                                                      XLogRecPtr commit_lsn);
+      </programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
+     <title>Transaction Rollback Prepared Callback</title>
+
+     <para>
+      The required <function>rollback_prepared_cb</function> callback is called
+      whenever a transaction rollback prepared has been decoded. The
+      <parameter>gid</parameter> field, which is part of the
+      <parameter>txn</parameter> parameter can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this prepare transaction in which case it can apply the
+      rollback, otherwise, it can skip the rollback operation. The
+      <parameter>gid</parameter> alone is not sufficient because the downstream
+      node can have prepared transaction with same identifier.
+      <programlisting>
+       typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                        ReorderBufferTXN *txn,
+                                                        XLogRecPtr preapre_end_lsn,
+                                                        TimestampTz prepare_time);
+      </programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-stream-start">
      <title>Stream Start Callback</title>
      <para>
@@ -735,6 +876,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+     <title>Stream Prepare Callback</title>
+     <para>
+      The <function>stream_prepare_cb</function> callback is called to prepare
+      a previously streamed transaction as part of a two-phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-stream-commit">
      <title>Stream Commit Callback</title>
      <para>
@@ -913,9 +1067,13 @@ OutputPluginWrite(ctx, true);
     When streaming an in-progress transaction, the changes (and messages) are
     streamed in blocks demarcated by <function>stream_start_cb</function>
     and <function>stream_stop_cb</function> callbacks. Once all the decoded
-    changes are transmitted, the transaction is committed using the
-    <function>stream_commit_cb</function> callback (or possibly aborted using
-    the <function>stream_abort_cb</function> callback).
+    changes are transmitted, the transaction can be committed using the
+    the <function>stream_commit_cb</function> callback
+    (or possibly aborted using the <function>stream_abort_cb</function> callback).
+    If two-phase commits are supported, the transaction can be prepared using the
+    <function>stream_prepare_cb</function> callback, commit prepared using the
+    <function>commit_prepared_cb</function> callback or aborted using the
+    <function>rollback_prepared_cb</function>.
    </para>
 
    <para>
index f1f4df7d70f286ad4d4a06efda0a82fbd133a7a8..6e3de92a67c54f0c460d46b2134eeaced171594a 100644 (file)
@@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                          XLogRecPtr commit_lsn);
+static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                          XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                                          XLogRecPtr commit_lsn);
+static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                                                XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                          Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                                   XLogRecPtr last_lsn);
 static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                                        XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                                         XLogRecPtr prepare_lsn);
 static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                                         XLogRecPtr commit_lsn);
 static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->reorder->stream_start = stream_start_cb_wrapper;
        ctx->reorder->stream_stop = stream_stop_cb_wrapper;
        ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+       ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
        ctx->reorder->stream_commit = stream_commit_cb_wrapper;
        ctx->reorder->stream_change = stream_change_cb_wrapper;
        ctx->reorder->stream_message = stream_message_cb_wrapper;
        ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
 
+
+       /*
+        * To support two-phase logical decoding, we require
+        * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
+        * filter_prepare callback is optional. We however enable two-phase
+        * logical decoding when at least one of the methods is enabled so that we
+        * can easily identify missing methods.
+        *
+        * We decide it here, but only check it later in the wrappers.
+        */
+       ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
+               (ctx->callbacks.prepare_cb != NULL) ||
+               (ctx->callbacks.commit_prepared_cb != NULL) ||
+               (ctx->callbacks.rollback_prepared_cb != NULL) ||
+               (ctx->callbacks.stream_prepare_cb != NULL) ||
+               (ctx->callbacks.filter_prepare_cb != NULL);
+
+       /*
+        * Callback to support decoding at prepare time.
+        */
+       ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
+       ctx->reorder->prepare = prepare_cb_wrapper;
+       ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+       ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
+
        ctx->out = makeStringInfo();
        ctx->prepare_write = prepare_write;
        ctx->write = do_write;
@@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        error_context_stack = errcallback.previous;
 }
 
+/*
+ * The functionality of begin_prepare is quite similar to begin with the
+ * exception that this will have gid (global transaction id) information which
+ * can be used by plugin. Now, we thought about extending the existing begin
+ * but that would break the replication protocol and additionally this looks
+ * cleaner.
+ */
+static void
+begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       /* We're only supposed to call this when two-phase commits are supported */
+       Assert(ctx->twophase);
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "begin_prepare";
+       state.report_location = txn->first_lsn;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+       ctx->write_location = txn->first_lsn;
+
+       /*
+        * If the plugin supports two-phase commits then begin prepare callback is
+        * mandatory
+        */
+       if (ctx->callbacks.begin_prepare_cb == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("logical replication at prepare time requires begin_prepare_cb callback")));
+
+       /* do the actual work: call callback */
+       ctx->callbacks.begin_prepare_cb(ctx, txn);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                  XLogRecPtr prepare_lsn)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       /* We're only supposed to call this when two-phase commits are supported */
+       Assert(ctx->twophase);
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "prepare";
+       state.report_location = txn->final_lsn; /* beginning of prepare record */
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+       ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+       /*
+        * If the plugin supports two-phase commits then prepare callback is
+        * mandatory
+        */
+       if (ctx->callbacks.prepare_cb == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("logical replication at prepare time requires prepare_cb callback")));
+
+       /* do the actual work: call callback */
+       ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                  XLogRecPtr commit_lsn)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       /* We're only supposed to call this when two-phase commits are supported */
+       Assert(ctx->twophase);
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "commit_prepared";
+       state.report_location = txn->final_lsn; /* beginning of commit record */
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+       ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+       /*
+        * If the plugin support two-phase commits then commit prepared callback
+        * is mandatory
+        */
+       if (ctx->callbacks.commit_prepared_cb == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("logical replication at prepare time requires commit_prepared_cb callback")));
+
+       /* do the actual work: call callback */
+       ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
+static void
+rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                        XLogRecPtr prepare_end_lsn,
+                                                        TimestampTz prepare_time)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       /* We're only supposed to call this when two-phase commits are supported */
+       Assert(ctx->twophase);
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "rollback_prepared";
+       state.report_location = txn->final_lsn; /* beginning of commit record */
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+       ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+       /*
+        * If the plugin support two-phase commits then rollback prepared callback
+        * is mandatory
+        */
+       if (ctx->callbacks.rollback_prepared_cb == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("logical replication at prepare time requires rollback_prepared_cb callback")));
+
+       /* do the actual work: call callback */
+       ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
+                                                                               prepare_time);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
 static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                  Relation relation, ReorderBufferChange *change)
@@ -859,6 +1074,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        error_context_stack = errcallback.previous;
 }
 
+bool
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+{
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+       bool            ret;
+
+       Assert(!ctx->fast_forward);
+
+       /*
+        * Skip if decoding of two-phase transactions at PREPARE time is not
+        * enabled. In that case, all two-phase transactions are considered
+        * filtered out and will be applied as regular transactions at COMMIT
+        * PREPARED.
+        */
+       if (!ctx->twophase)
+               return true;
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "filter_prepare";
+       state.report_location = InvalidXLogRecPtr;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = false;
+
+       /* do the actual work: call callback */
+       ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+
+       return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1056,6 +1310,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        error_context_stack = errcallback.previous;
 }
 
+static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                                                 XLogRecPtr prepare_lsn)
+{
+       LogicalDecodingContext *ctx = cache->private_data;
+       LogicalErrorCallbackState state;
+       ErrorContextCallback errcallback;
+
+       Assert(!ctx->fast_forward);
+
+       /*
+        * We're only supposed to call this when streaming and two-phase commits
+        * are supported.
+        */
+       Assert(ctx->streaming);
+       Assert(ctx->twophase);
+
+       /* Push callback + info on the error context stack */
+       state.ctx = ctx;
+       state.callback_name = "stream_prepare";
+       state.report_location = txn->final_lsn;
+       errcallback.callback = output_plugin_error_callback;
+       errcallback.arg = (void *) &state;
+       errcallback.previous = error_context_stack;
+       error_context_stack = &errcallback;
+
+       /* set output state */
+       ctx->accept_writes = true;
+       ctx->write_xid = txn->xid;
+       ctx->write_location = txn->end_lsn;
+
+       /* in streaming mode with two-phase commits, stream_prepare_cb is required */
+       if (ctx->callbacks.stream_prepare_cb == NULL)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("logical streaming at prepare time requires a stream_prepare_cb callback")));
+
+       ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+
+       /* Pop the error context stack */
+       error_context_stack = errcallback.previous;
+}
+
 static void
 stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                                                 XLogRecPtr commit_lsn)
index 40bab7ee02df48ed4d203af1b61f1f23c10fbf68..28c9c1f474eb3cf03e5df07eba47dea359955d22 100644 (file)
@@ -84,6 +84,11 @@ typedef struct LogicalDecodingContext
         */
        bool            streaming;
 
+       /*
+        * Does the output plugin support two-phase decoding, and is it enabled?
+        */
+       bool            twophase;
+
        /*
         * State for writing output.
         */
@@ -120,6 +125,7 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
                                                                                                  XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
index b78c796450a18330eda89a1bde95396670ebb9b4..89e1dc3517d66b134b3c2cf3b6680c9627fa7b9c 100644 (file)
@@ -99,6 +99,45 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
 
+/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                                                                         const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+                                                                                        ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                                                               ReorderBufferTXN *txn,
+                                                                               XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                                                          ReorderBufferTXN *txn,
+                                                                                          XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                                                                ReorderBufferTXN *txn,
+                                                                                                XLogRecPtr prepare_end_lsn,
+                                                                                                TimestampTz prepare_time);
+
+
 /*
  * Called when starting to stream a block of changes from in-progress
  * transaction (may be called repeatedly, if it's streamed in multiple
@@ -123,6 +162,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
                                                                                        ReorderBufferTXN *txn,
                                                                                        XLogRecPtr abort_lsn);
 
+/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+                                                                                         ReorderBufferTXN *txn,
+                                                                                         XLogRecPtr prepare_lsn);
+
 /*
  * Called to apply changes streamed to remote node from in-progress
  * transaction.
@@ -173,10 +220,19 @@ typedef struct OutputPluginCallbacks
        LogicalDecodeMessageCB message_cb;
        LogicalDecodeFilterByOriginCB filter_by_origin_cb;
        LogicalDecodeShutdownCB shutdown_cb;
+
+       /* streaming of changes at prepare time */
+       LogicalDecodeFilterPrepareCB filter_prepare_cb;
+       LogicalDecodeBeginPrepareCB begin_prepare_cb;
+       LogicalDecodePrepareCB prepare_cb;
+       LogicalDecodeCommitPreparedCB commit_prepared_cb;
+       LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
        /* streaming of changes */
        LogicalDecodeStreamStartCB stream_start_cb;
        LogicalDecodeStreamStopCB stream_stop_cb;
        LogicalDecodeStreamAbortCB stream_abort_cb;
+       LogicalDecodeStreamPrepareCB stream_prepare_cb;
        LogicalDecodeStreamCommitCB stream_commit_cb;
        LogicalDecodeStreamChangeCB stream_change_cb;
        LogicalDecodeStreamMessageCB stream_message_cb;
index bd9dd7ec6764f34ce8ba6e3a796764222d60f68c..1e60afe70f4a07aa268579e27aea60045190ae19 100644 (file)
@@ -244,6 +244,12 @@ typedef struct ReorderBufferTXN
        /* Xid of top-level transaction, if known */
        TransactionId toplevel_xid;
 
+       /*
+        * Global transaction id required for identification of prepared
+        * transactions.
+        */
+       char       *gid;
+
        /*
         * LSN of the first data carrying, WAL record with knowledge about this
         * xid. This is allowed to *not* be first record adorned with this xid, if
@@ -418,6 +424,26 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
                                                                                const char *prefix, Size sz,
                                                                                const char *message);
 
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+                                                                                        ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+                                                                               ReorderBufferTXN *txn,
+                                                                               XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+                                                                                          ReorderBufferTXN *txn,
+                                                                                          XLogRecPtr commit_lsn);
+
+/* rollback  prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+                                                                                                ReorderBufferTXN *txn,
+                                                                                                XLogRecPtr prepare_end_lsn,
+                                                                                                TimestampTz prepare_time);
+
 /* start streaming transaction callback signature */
 typedef void (*ReorderBufferStreamStartCB) (
                                                                                        ReorderBuffer *rb,
@@ -436,6 +462,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
                                                                                        ReorderBufferTXN *txn,
                                                                                        XLogRecPtr abort_lsn);
 
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+                                                                                         ReorderBuffer *rb,
+                                                                                         ReorderBufferTXN *txn,
+                                                                                         XLogRecPtr prepare_lsn);
+
 /* commit streamed transaction callback signature */
 typedef void (*ReorderBufferStreamCommitCB) (
                                                                                         ReorderBuffer *rb,
@@ -504,12 +536,21 @@ struct ReorderBuffer
        ReorderBufferCommitCB commit;
        ReorderBufferMessageCB message;
 
+       /*
+        * Callbacks to be called when streaming a transaction at prepare time.
+        */
+       ReorderBufferBeginCB begin_prepare;
+       ReorderBufferPrepareCB prepare;
+       ReorderBufferCommitPreparedCB commit_prepared;
+       ReorderBufferRollbackPreparedCB rollback_prepared;
+
        /*
         * Callbacks to be called when streaming a transaction.
         */
        ReorderBufferStreamStartCB stream_start;
        ReorderBufferStreamStopCB stream_stop;
        ReorderBufferStreamAbortCB stream_abort;
+       ReorderBufferStreamPrepareCB stream_prepare;
        ReorderBufferStreamCommitCB stream_commit;
        ReorderBufferStreamChangeCB stream_change;
        ReorderBufferStreamMessageCB stream_message;
index bca37c536eef1d88db88bd69d25116df284370db..9cd047ba25ea688fa41df6d266a23783f9642b20 100644 (file)
@@ -1315,9 +1315,21 @@ LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeChangeCB
 LogicalDecodeCommitCB
+LogicalDecodeFilterPrepareCB
+LogicalDecodeBeginPrepareCB
+LogicalDecodePrepareCB
+LogicalDecodeCommitPreparedCB
+LogicalDecodeRollbackPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeMessageCB
 LogicalDecodeShutdownCB
+LogicalDecodeStreamStartCB
+LogicalDecodeStreamStopCB
+LogicalDecodeStreamAbortCB
+LogicalDecodeStreamPrepareCB
+LogicalDecodeStreamCommitCB
+LogicalDecodeStreamChangeCB
+LogicalDecodeStreamMessageCB
 LogicalDecodeStartupCB
 LogicalDecodeTruncateCB
 LogicalDecodingContext