Add support for prepared transactions to built-in logical replication.
authorAmit Kapila <akapila@postgresql.org>
Wed, 14 Jul 2021 02:03:50 +0000 (07:33 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 14 Jul 2021 02:03:50 +0000 (07:33 +0530)
To add support for streaming transactions at prepare time into the
built-in logical replication, we need to do the following things:

* Modify the output plugin (pgoutput) to implement the new two-phase API
callbacks, by leveraging the extended replication protocol.

* Modify the replication apply worker, to properly handle two-phase
transactions by replaying them on prepare.

* Add a new SUBSCRIPTION option "two_phase" to allow users to enable
two-phase transactions. We enable the two_phase once the initial data sync
is over.

We however must explicitly disable replication of two-phase transactions
during replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover, we don't have a replication connection open so we don't know
where to send the data anyway.

The streaming option is not allowed with this new two_phase option. This
can be done as a separate patch.

We don't allow to toggle two_phase option of a subscription because it can
lead to an inconsistent replica. For the same reason, we don't allow to
refresh the publication once the two_phase is enabled for a subscription
unless copy_data option is false.

Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com

43 files changed:
contrib/test_decoding/test_decoding.c
doc/src/sgml/catalogs.sgml
doc/src/sgml/protocol.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
doc/src/sgml/ref/pg_dump.sgml
src/backend/access/transam/twophase.c
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/origin.c
src/backend/replication/logical/proto.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/replication/slot.c
src/backend/replication/walreceiver.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/bin/psql/tab-complete.c
src/include/access/twophase.h
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/include/catalog/pg_subscription_rel.h
src/include/replication/logical.h
src/include/replication/logicalproto.h
src/include/replication/pgoutput.h
src/include/replication/reorderbuffer.h
src/include/replication/slot.h
src/include/replication/snapbuild.h
src/include/replication/walreceiver.h
src/include/replication/worker_internal.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/021_twophase.pl [new file with mode: 0644]
src/test/subscription/t/022_twophase_cascade.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index de1b692658114393a62552496c76b4ed32bd5486..e5cd84e85e4142ce627a5cfd76327d017ef71d17 100644 (file)
@@ -339,7 +339,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.commit_time));
 
        OutputPluginWrite(ctx, true);
 }
@@ -382,7 +382,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.prepare_time));
 
        OutputPluginWrite(ctx, true);
 }
@@ -404,7 +404,7 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.commit_time));
 
        OutputPluginWrite(ctx, true);
 }
@@ -428,7 +428,7 @@ pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.commit_time));
 
        OutputPluginWrite(ctx, true);
 }
@@ -853,7 +853,7 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx,
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.prepare_time));
 
        OutputPluginWrite(ctx, true);
 }
@@ -882,7 +882,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 
        if (data->include_timestamp)
                appendStringInfo(ctx->out, " (at %s)",
-                                                timestamptz_to_str(txn->commit_time));
+                                                timestamptz_to_str(txn->xact_time.commit_time));
 
        OutputPluginWrite(ctx, true);
 }
index f517a7d4aff2340219f814ea5efd004c14689386..0f5d25b948a3f4b58d645b2beed3ade6d2f0e44f 100644 (file)
@@ -7641,6 +7641,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subtwophasestate</structfield> <type>char</type>
+      </para>
+      <para>
+       State codes for two-phase mode:
+       <literal>d</literal> = disabled,
+       <literal>p</literal> = pending enablement,
+       <literal>e</literal> = enabled
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
index a3562f3d0891d77d9f40522e5fc6ebe5aba2eff7..e8cb78ff1f3776bd1e24232fa82d33186177bf6a 100644 (file)
@@ -2811,11 +2811,17 @@ The commands accepted in replication mode are:
      </term>
      <listitem>
       <para>
-       Protocol version. Currently versions <literal>1</literal> and
-       <literal>2</literal> are supported. The version <literal>2</literal>
-       is supported only for server version 14 and above, and it allows
-       streaming of large in-progress transactions.
-     </para>
+       Protocol version. Currently versions <literal>1</literal>, <literal>2</literal>,
+       and <literal>3</literal> are supported.
+      </para>
+      <para>
+       Version <literal>2</literal> is supported only for server version 14
+       and above, and it allows streaming of large in-progress transactions.
+      </para>
+      <para>
+       Version <literal>3</literal> is supported only for server version 15
+       and above, and it allows streaming of two-phase transactions.
+      </para>
      </listitem>
     </varlistentry>
 
@@ -2871,10 +2877,11 @@ The commands accepted in replication mode are:
   <para>
    The logical replication protocol sends individual transactions one by one.
    This means that all messages between a pair of Begin and Commit messages
-   belong to the same transaction. It also sends changes of large in-progress
-   transactions between a pair of Stream Start and Stream Stop messages. The
-   last stream of such a transaction contains Stream Commit or Stream Abort
-   message.
+   belong to the same transaction. Similarly, all messages between a pair of
+   Begin Prepare and Prepare messages belong to the same transaction.
+   It also sends changes of large in-progress transactions between a pair of
+   Stream Start and Stream Stop messages. The last stream of such a transaction 
+   contains a Stream Commit or Stream Abort message.
   </para>
 
   <para>
@@ -7390,6 +7397,272 @@ Stream Abort
 
 </variablelist>
 
+<para>
+The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
+are available since protocol version 3.
+</para>
+
+<variablelist>
+
+<varlistentry>
+
+<term>Begin Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('b')</term>
+<listitem><para>
+                Identifies the message as the beginning of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('P')</term>
+<listitem><para>
+                Identifies the message as a two-phase prepared transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+                Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Commit Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('K')</term>
+<listitem><para>
+                Identifies the message as the commit of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+                Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The LSN of the commit prepared.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the commit prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Commit timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Rollback Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('r')</term>
+<listitem><para>
+                Identifies the message as the rollback of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+                Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the rollback prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Rollback timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+
 <para>
 
 The following message parts are shared by the above messages.
index b3d173179f4cbb79ef6c13c4d35f1b4e6178bd2b..a6f994450dc751b9dc6c798a0d6f7fd8f5145938 100644 (file)
@@ -67,6 +67,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
    Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command> with refresh
    option as true cannot be executed inside a transaction block.
+
+   These commands also cannot be executed when the subscription has
+   <literal>two_phase</literal> commit enabled, unless <literal>copy_data = false</literal>.
+   See column <literal>subtwophasestate</literal> of
+   <xref linkend="catalog-pg-subscription"/> to know the actual two-phase state.
   </para>
  </refsect1>
 
index e812beee3738f930f76679d98d521d49a24ce655..143390593d0d623cde26571ea9494613be2e2343 100644 (file)
@@ -237,6 +237,43 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           are fully decoded on the publisher, and only then sent to the
           subscriber as a whole.
          </para>
+
+         <para>
+          The <literal>streaming</literal> option cannot be used with the
+          <literal>two_phase</literal> option.
+         </para>
+
+        </listitem>
+       </varlistentry>
+       <varlistentry>
+        <term><literal>two_phase</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether two-phase commit is enabled for this subscription.
+          The default is <literal>false</literal>.
+         </para>
+
+         <para>
+          When two-phase commit is enabled then the decoded transactions are sent
+          to the subscriber on the PREPARE TRANSACTION. By default, the transaction
+          prepared on the publisher is decoded as a normal transaction at commit.
+         </para>
+
+         <para>
+          The two-phase commit implementation requires that the replication has
+          successfully passed the initial table synchronization phase. This means
+          even when two_phase is enabled for the subscription, the internal
+          two-phase state remains temporarily "pending" until the initialization
+          phase is completed. See column
+          <literal>subtwophasestate</literal> of <xref linkend="catalog-pg-subscription"/>
+          to know the actual two-phase state.
+         </para>
+
+         <para>
+          The <literal>two_phase</literal> option cannot be used with the
+          <literal>streaming</literal> option.
+         </para>
+
         </listitem>
        </varlistentry>
       </variablelist></para>
index a6c0788592b06f5baab49791b3e3f09586dbdad2..7682226b9914cece61189fb16b482dad781563e1 100644 (file)
@@ -1405,7 +1405,12 @@ CREATE DATABASE foo WITH TEMPLATE template0;
    servers.  It is then up to the user to reactivate the subscriptions in a
    suitable way.  If the involved hosts have changed, the connection
    information might have to be changed.  It might also be appropriate to
-   truncate the target tables before initiating a new full table copy.
+   truncate the target tables before initiating a new full table copy.  If users
+   intend to copy initial data during refresh they must create the slot with
+   <literal>two_phase = false</literal>.  After the initial sync, the
+   <literal>two_phase</literal> option will be automatically enabled by the
+   subscriber if the subscription had been originally created with
+   <literal>two_phase = true</literal> option.
   </para>
  </refsect1>
 
index f67d813c56493c4a3a0f9f1d8aac1381605a5769..6d3efb49a40a0dfcb63df224eef3f956c6b75ee0 100644 (file)
@@ -2458,3 +2458,71 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
                RemoveTwoPhaseFile(xid, giveWarning);
        RemoveGXact(gxact);
 }
+
+/*
+ * LookupGXact
+ *             Check if the prepared transaction with the given GID, lsn and timestamp
+ *             exists.
+ *
+ * Note that we always compare with the LSN where prepare ends because that is
+ * what is stored as origin_lsn in the 2PC file.
+ *
+ * This function is primarily used to check if the prepared transaction
+ * received from the upstream (remote node) already exists. Checking only GID
+ * is not sufficient because a different prepared xact with the same GID can
+ * exist on the same node. So, we are ensuring to match origin_lsn and
+ * origin_timestamp of prepared xact to avoid the possibility of a match of
+ * prepared xact from two different nodes.
+ */
+bool
+LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
+                       TimestampTz origin_prepare_timestamp)
+{
+       int                     i;
+       bool            found = false;
+
+       LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+       for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+       {
+               GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+               /* Ignore not-yet-valid GIDs. */
+               if (gxact->valid && strcmp(gxact->gid, gid) == 0)
+               {
+                       char       *buf;
+                       TwoPhaseFileHeader *hdr;
+
+                       /*
+                        * We are not expecting collisions of GXACTs (same gid) between
+                        * publisher and subscribers, so we perform all I/O while holding
+                        * TwoPhaseStateLock for simplicity.
+                        *
+                        * To move the I/O out of the lock, we need to ensure that no
+                        * other backend commits the prepared xact in the meantime. We can
+                        * do this optimization if we encounter many collisions in GID
+                        * between publisher and subscriber.
+                        */
+                       if (gxact->ondisk)
+                               buf = ReadTwoPhaseFile(gxact->xid, false);
+                       else
+                       {
+                               Assert(gxact->prepare_start_lsn);
+                               XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+                       }
+
+                       hdr = (TwoPhaseFileHeader *) buf;
+
+                       if (hdr->origin_lsn == prepare_end_lsn &&
+                               hdr->origin_timestamp == origin_prepare_timestamp)
+                       {
+                               found = true;
+                               pfree(buf);
+                               break;
+                       }
+
+                       pfree(buf);
+               }
+       }
+       LWLockRelease(TwoPhaseStateLock);
+       return found;
+}
index 29fc4218cd4bb736e945d170cb719606a4a199e9..25021e25a4ca493b555d82a9380e28b5de8203a4 100644 (file)
@@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok)
        sub->enabled = subform->subenabled;
        sub->binary = subform->subbinary;
        sub->stream = subform->substream;
+       sub->twophasestate = subform->subtwophasestate;
 
        /* Get conninfo */
        datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -450,6 +451,39 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
        table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Does the subscription have any relations?
+ *
+ * Use this function only to know true/false, and when you have no need for the
+ * List returned by GetSubscriptionRelations.
+ */
+bool
+HasSubscriptionRelations(Oid subid)
+{
+       Relation        rel;
+       ScanKeyData skey[1];
+       SysScanDesc scan;
+       bool            has_subrels;
+
+       rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+       ScanKeyInit(&skey[0],
+                               Anum_pg_subscription_rel_srsubid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(subid));
+
+       scan = systable_beginscan(rel, InvalidOid, false,
+                                                         NULL, 1, skey);
+
+       /* If even a single tuple exists then the subscription has tables. */
+       has_subrels = HeapTupleIsValid(systable_getnext(scan));
+
+       /* Cleanup */
+       systable_endscan(scan);
+       table_close(rel, AccessShareLock);
+
+       return has_subrels;
+}
 
 /*
  * Get all relations for subscription.
index 999d9840683f6126914852f15f2703c353b62f5d..55f6e3711d8442c7a45ad266dfd5656269374f7e 100644 (file)
@@ -1255,5 +1255,5 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
index eb88d877a5032dabd6f90a6ea5f04147650a2eac..5f834a9c300fe5b10a2e3712afea328bdb1734cf 100644 (file)
@@ -59,6 +59,7 @@
 #define SUBOPT_REFRESH                         0x00000040
 #define SUBOPT_BINARY                          0x00000080
 #define SUBOPT_STREAMING                       0x00000100
+#define SUBOPT_TWOPHASE_COMMIT         0x00000200
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -79,6 +80,7 @@ typedef struct SubOpts
        bool            refresh;
        bool            binary;
        bool            streaming;
+       bool            twophase;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -123,6 +125,8 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
                opts->binary = false;
        if (IsSet(supported_opts, SUBOPT_STREAMING))
                opts->streaming = false;
+       if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
+               opts->twophase = false;
 
        /* Parse options */
        foreach(lc, stmt_options)
@@ -237,6 +241,29 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
                        opts->specified_opts |= SUBOPT_STREAMING;
                        opts->streaming = defGetBoolean(defel);
                }
+               else if (strcmp(defel->defname, "two_phase") == 0)
+               {
+                       /*
+                        * Do not allow toggling of two_phase option. Doing so could cause
+                        * missing of transactions and lead to an inconsistent replica.
+                        * See comments atop worker.c
+                        *
+                        * Note: Unsupported twophase indicates that this call originated
+                        * from AlterSubscription.
+                        */
+                       if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
+
+                       if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+
+                       opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
+                       opts->twophase = defGetBoolean(defel);
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -325,6 +352,25 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
                                         errmsg("subscription with %s must also set %s",
                                                        "slot_name = NONE", "create_slot = false")));
        }
+
+       /*
+        * Do additional checking for the disallowed combination of two_phase and
+        * streaming. While streaming and two_phase can theoretically be
+        * supported, it needs more analysis to allow them together.
+        */
+       if (opts->twophase &&
+               IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+               IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
+       {
+               if (opts->streaming &&
+                       IsSet(supported_opts, SUBOPT_STREAMING) &&
+                       IsSet(opts->specified_opts, SUBOPT_STREAMING))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                       /*- translator: both %s are strings of the form "option = value" */
+                                        errmsg("%s and %s are mutually exclusive options",
+                                                       "two_phase = true", "streaming = true")));
+       }
 }
 
 /*
@@ -385,7 +431,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
                                          SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
                                          SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-                                         SUBOPT_STREAMING);
+                                         SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
        parse_subscription_options(stmt->options, supported_opts, &opts);
 
        /*
@@ -455,6 +501,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
        values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
        values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
        values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+       values[Anum_pg_subscription_subtwophasestate - 1] =
+               CharGetDatum(opts.twophase ?
+                                        LOGICALREP_TWOPHASE_STATE_PENDING :
+                                        LOGICALREP_TWOPHASE_STATE_DISABLED);
        values[Anum_pg_subscription_subconninfo - 1] =
                CStringGetTextDatum(conninfo);
        if (opts.slot_name)
@@ -532,10 +582,35 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                         */
                        if (opts.create_slot)
                        {
+                               bool            twophase_enabled = false;
+
                                Assert(opts.slot_name);
 
-                               walrcv_create_slot(wrconn, opts.slot_name, false,
+                               /*
+                                * Even if two_phase is set, don't create the slot with
+                                * two-phase enabled. Will enable it once all the tables are
+                                * synced and ready. This avoids race-conditions like prepared
+                                * transactions being skipped due to changes not being applied
+                                * due to checks in should_apply_changes_for_rel() when
+                                * tablesync for the corresponding tables are in progress. See
+                                * comments atop worker.c.
+                                *
+                                * Note that if tables were specified but copy_data is false
+                                * then it is safe to enable two_phase up-front because those
+                                * tables are already initially in READY state. When the
+                                * subscription has no tables, we leave the twophase state as
+                                * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
+                                * PUBLICATION to work.
+                                */
+                               if (opts.twophase && !opts.copy_data && tables != NIL)
+                                       twophase_enabled = true;
+
+                               walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
                                                                   CRS_NOEXPORT_SNAPSHOT, NULL);
+
+                               if (twophase_enabled)
+                                       UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+
                                ereport(NOTICE,
                                                (errmsg("created replication slot \"%s\" on publisher",
                                                                opts.slot_name)));
@@ -865,6 +940,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
                                if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
                                {
+                                       if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                                errmsg("cannot set %s for two-phase enabled subscription",
+                                                                               "streaming = true")));
+
                                        values[Anum_pg_subscription_substream - 1] =
                                                BoolGetDatum(opts.streaming);
                                        replaces[Anum_pg_subscription_substream - 1] = true;
@@ -927,6 +1008,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                                                 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
                                                                 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
 
+                                       /*
+                                        * See ALTER_SUBSCRIPTION_REFRESH for details why this is
+                                        * not allowed.
+                                        */
+                                       if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                                errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
+                                                                errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false"
+                                                                                ", or use DROP/CREATE SUBSCRIPTION.")));
+
                                        PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
 
                                        /* Make sure refresh sees the new list of publications. */
@@ -966,6 +1058,17 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                                                                 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
                                                                 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
 
+                                       /*
+                                        * See ALTER_SUBSCRIPTION_REFRESH for details why this is
+                                        * not allowed.
+                                        */
+                                       if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                                errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
+                                                                errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false"
+                                                                                ", or use DROP/CREATE SUBSCRIPTION.")));
+
                                        PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
 
                                        /* Only refresh the added/dropped list of publications. */
@@ -986,6 +1089,30 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
                                parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
 
+                               /*
+                                * The subscription option "two_phase" requires that
+                                * replication has passed the initial table synchronization
+                                * phase before the two_phase becomes properly enabled.
+                                *
+                                * But, having reached this two-phase commit "enabled" state
+                                * we must not allow any subsequent table initialization to
+                                * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
+                                * when the user had requested two_phase = on mode.
+                                *
+                                * The exception to this restriction is when copy_data =
+                                * false, because when copy_data is false the tablesync will
+                                * start already in READY state and will exit directly without
+                                * doing anything.
+                                *
+                                * For more details see comments atop worker.c.
+                                */
+                               if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                                        errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
+                                                        errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false"
+                                                                        ", or use DROP/CREATE SUBSCRIPTION.")));
+
                                PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
                                AlterSubscription_refresh(sub, opts.copy_data);
index 6eaa84a0315a20801466917d05b005beb1bb7925..19ea159af4fb1c2d02c28ea3f0f684965957909b 100644 (file)
@@ -73,6 +73,7 @@ static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
 static char *libpqrcv_create_slot(WalReceiverConn *conn,
                                                                  const char *slotname,
                                                                  bool temporary,
+                                                                 bool two_phase,
                                                                  CRSSnapshotAction snapshot_action,
                                                                  XLogRecPtr *lsn);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
@@ -436,6 +437,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
                        PQserverVersion(conn->streamConn) >= 140000)
                        appendStringInfoString(&cmd, ", streaming 'on'");
 
+               if (options->proto.logical.twophase &&
+                       PQserverVersion(conn->streamConn) >= 150000)
+                       appendStringInfoString(&cmd, ", two_phase 'on'");
+
                pubnames = options->proto.logical.publication_names;
                pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
                if (!pubnames_str)
@@ -851,7 +856,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
  */
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
-                                        bool temporary, CRSSnapshotAction snapshot_action,
+                                        bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
                                         XLogRecPtr *lsn)
 {
        PGresult   *res;
@@ -868,6 +873,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
        if (conn->logical)
        {
                appendStringInfoString(&cmd, " LOGICAL pgoutput");
+               if (two_phase)
+                       appendStringInfoString(&cmd, " TWO_PHASE");
+
                switch (snapshot_action)
                {
                        case CRS_EXPORT_SNAPSHOT:
index 453efc51e1625e939c9b4b24757fe412ed754fe2..2874dc061222178b80d20999f283477b55a86c51 100644 (file)
@@ -374,11 +374,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                 *
                                 * XXX Now, this can even lead to a deadlock if the prepare
                                 * transaction is waiting to get it logically replicated for
-                                * distributed 2PC. Currently, we don't have an in-core
-                                * implementation of prepares for distributed 2PC but some
-                                * out-of-core logical replication solution can have such an
-                                * implementation. They need to inform users to not have locks
-                                * on catalog tables in such transactions.
+                                * distributed 2PC. This can be avoided by disallowing
+                                * preparing transactions that have locked [user] catalog
+                                * tables exclusively but as of now, we ask users not to do
+                                * such an operation.
                                 */
                                DecodePrepare(ctx, buf, &parsed);
                                break;
@@ -735,7 +734,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
        if (two_phase)
        {
                ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
-                                                                       SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
+                                                                       SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
                                                                        commit_time, origin_id, origin_lsn,
                                                                        parsed->twophase_gid, true);
        }
index d536a5f3ba3b59fd05dc8dfd0648edc5388628d4..d61ef4cfada7875bae7937e78a469a39d46e151e 100644 (file)
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->reorder = ReorderBufferAllocate();
        ctx->snapshot_builder =
                AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-                                                               need_full_snapshot, slot->data.initial_consistent_point);
+                                                               need_full_snapshot, slot->data.two_phase_at);
 
        ctx->reorder->private_data = ctx;
 
@@ -432,10 +432,12 @@ CreateInitDecodingContext(const char *plugin,
        MemoryContextSwitchTo(old_context);
 
        /*
-        * We allow decoding of prepared transactions iff the two_phase option is
-        * enabled at the time of slot creation.
+        * We allow decoding of prepared transactions when the two_phase is
+        * enabled at the time of slot creation, or when the two_phase option is
+        * given at the streaming start, provided the plugin supports all the
+        * callbacks for two-phase.
         */
-       ctx->twophase &= MyReplicationSlot->data.two_phase;
+       ctx->twophase &= slot->data.two_phase;
 
        ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
@@ -538,10 +540,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
        MemoryContextSwitchTo(old_context);
 
        /*
-        * We allow decoding of prepared transactions iff the two_phase option is
-        * enabled at the time of slot creation.
+        * We allow decoding of prepared transactions when the two_phase is
+        * enabled at the time of slot creation, or when the two_phase option is
+        * given at the streaming start, provided the plugin supports all the
+        * callbacks for two-phase.
         */
-       ctx->twophase &= MyReplicationSlot->data.two_phase;
+       ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
+
+       /* Mark slot to allow two_phase decoding if not already marked */
+       if (ctx->twophase && !slot->data.two_phase)
+       {
+               slot->data.two_phase = true;
+               slot->data.two_phase_at = start_lsn;
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
+               SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
+       }
 
        ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
@@ -602,7 +616,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
        SpinLockAcquire(&slot->mutex);
        slot->data.confirmed_flush = ctx->reader->EndRecPtr;
-       slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
+       if (slot->data.two_phase)
+               slot->data.two_phase_at = ctx->reader->EndRecPtr;
        SpinLockRelease(&slot->mutex);
 }
 
index cb42fcb34d12001414375af5b3fe5f8013afb5fe..2c191dec04503c73157906b2e0668ec4d8326318 100644 (file)
@@ -973,8 +973,11 @@ replorigin_advance(RepOriginId node,
 
        /*
         * Due to - harmless - race conditions during a checkpoint we could see
-        * values here that are older than the ones we already have in memory.
-        * Don't overwrite those.
+        * values here that are older than the ones we already have in memory. We
+        * could also see older values for prepared transactions when the prepare
+        * is sent at a later point of time along with commit prepared and there
+        * are other transactions commits between prepare and commit prepared. See
+        * ReorderBufferFinishPrepared. Don't overwrite those.
         */
        if (go_backward || replication_state->remote_lsn < remote_commit)
                replication_state->remote_lsn = remote_commit;
index 1cf59e0fb0fae89e99c6489af1a1359dd0fa225d..13c8c3bd5bbdf58e75cc45a58e2e1fcd0a7088d7 100644 (file)
@@ -49,7 +49,7 @@ logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
 
        /* fixed fields */
        pq_sendint64(out, txn->final_lsn);
-       pq_sendint64(out, txn->commit_time);
+       pq_sendint64(out, txn->xact_time.commit_time);
        pq_sendint32(out, txn->xid);
 }
 
@@ -85,7 +85,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
        /* send fields */
        pq_sendint64(out, commit_lsn);
        pq_sendint64(out, txn->end_lsn);
-       pq_sendint64(out, txn->commit_time);
+       pq_sendint64(out, txn->xact_time.commit_time);
 }
 
 /*
@@ -106,6 +106,217 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
        commit_data->committime = pq_getmsgint64(in);
 }
 
+/*
+ * Write BEGIN PREPARE to the output stream.
+ */
+void
+logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
+{
+       pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
+
+       /* fixed fields */
+       pq_sendint64(out, txn->final_lsn);
+       pq_sendint64(out, txn->end_lsn);
+       pq_sendint64(out, txn->xact_time.prepare_time);
+       pq_sendint32(out, txn->xid);
+
+       /* send gid */
+       pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction BEGIN PREPARE from the stream.
+ */
+void
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
+{
+       /* read fields */
+       begin_data->prepare_lsn = pq_getmsgint64(in);
+       if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "prepare_lsn not set in begin prepare message");
+       begin_data->end_lsn = pq_getmsgint64(in);
+       if (begin_data->end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "end_lsn not set in begin prepare message");
+       begin_data->prepare_time = pq_getmsgint64(in);
+       begin_data->xid = pq_getmsgint(in, 4);
+
+       /* read gid (copy it into a pre-allocated buffer) */
+       strcpy(begin_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+                                                XLogRecPtr prepare_lsn)
+{
+       uint8           flags = 0;
+
+       pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+       /*
+        * This should only ever happen for two-phase commit transactions, in
+        * which case we expect to have a valid GID.
+        */
+       Assert(txn->gid != NULL);
+       Assert(rbtxn_prepared(txn));
+
+       /* send the flags field */
+       pq_sendbyte(out, flags);
+
+       /* send fields */
+       pq_sendint64(out, prepare_lsn);
+       pq_sendint64(out, txn->end_lsn);
+       pq_sendint64(out, txn->xact_time.prepare_time);
+       pq_sendint32(out, txn->xid);
+
+       /* send gid */
+       pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+       /* read flags */
+       uint8           flags = pq_getmsgbyte(in);
+
+       if (flags != 0)
+               elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+       /* read fields */
+       prepare_data->prepare_lsn = pq_getmsgint64(in);
+       if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "prepare_lsn is not set in prepare message");
+       prepare_data->end_lsn = pq_getmsgint64(in);
+       if (prepare_data->end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "end_lsn is not set in prepare message");
+       prepare_data->prepare_time = pq_getmsgint64(in);
+       prepare_data->xid = pq_getmsgint(in, 4);
+
+       /* read gid (copy it into a pre-allocated buffer) */
+       strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write COMMIT PREPARED to the output stream.
+ */
+void
+logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+                                                                XLogRecPtr commit_lsn)
+{
+       uint8           flags = 0;
+
+       pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
+
+       /*
+        * This should only ever happen for two-phase commit transactions, in
+        * which case we expect to have a valid GID.
+        */
+       Assert(txn->gid != NULL);
+
+       /* send the flags field */
+       pq_sendbyte(out, flags);
+
+       /* send fields */
+       pq_sendint64(out, commit_lsn);
+       pq_sendint64(out, txn->end_lsn);
+       pq_sendint64(out, txn->xact_time.commit_time);
+       pq_sendint32(out, txn->xid);
+
+       /* send gid */
+       pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction COMMIT PREPARED from the stream.
+ */
+void
+logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
+{
+       /* read flags */
+       uint8           flags = pq_getmsgbyte(in);
+
+       if (flags != 0)
+               elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
+
+       /* read fields */
+       prepare_data->commit_lsn = pq_getmsgint64(in);
+       if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "commit_lsn is not set in commit prepared message");
+       prepare_data->end_lsn = pq_getmsgint64(in);
+       if (prepare_data->end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "end_lsn is not set in commit prepared message");
+       prepare_data->commit_time = pq_getmsgint64(in);
+       prepare_data->xid = pq_getmsgint(in, 4);
+
+       /* read gid (copy it into a pre-allocated buffer) */
+       strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write ROLLBACK PREPARED to the output stream.
+ */
+void
+logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+                                                                  XLogRecPtr prepare_end_lsn,
+                                                                  TimestampTz prepare_time)
+{
+       uint8           flags = 0;
+
+       pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+       /*
+        * This should only ever happen for two-phase commit transactions, in
+        * which case we expect to have a valid GID.
+        */
+       Assert(txn->gid != NULL);
+
+       /* send the flags field */
+       pq_sendbyte(out, flags);
+
+       /* send fields */
+       pq_sendint64(out, prepare_end_lsn);
+       pq_sendint64(out, txn->end_lsn);
+       pq_sendint64(out, prepare_time);
+       pq_sendint64(out, txn->xact_time.commit_time);
+       pq_sendint32(out, txn->xid);
+
+       /* send gid */
+       pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction ROLLBACK PREPARED from the stream.
+ */
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+                                                                 LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+       /* read flags */
+       uint8           flags = pq_getmsgbyte(in);
+
+       if (flags != 0)
+               elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
+
+       /* read fields */
+       rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+       if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
+       rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+       if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+               elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
+       rollback_data->prepare_time = pq_getmsgint64(in);
+       rollback_data->rollback_time = pq_getmsgint64(in);
+       rollback_data->xid = pq_getmsgint(in, 4);
+
+       /* read gid (copy it into a pre-allocated buffer) */
+       strcpy(rollback_data->gid, pq_getmsgstring(in));
+}
+
 /*
  * Write ORIGIN to the output stream.
  */
@@ -841,7 +1052,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
        /* send fields */
        pq_sendint64(out, commit_lsn);
        pq_sendint64(out, txn->end_lsn);
-       pq_sendint64(out, txn->commit_time);
+       pq_sendint64(out, txn->xact_time.commit_time);
 }
 
 /*
index 1b4f4a528aafd6cc439517d6da1cfcf60e7af6c3..7378beb684d972fc429110b744b41c02b977c533 100644 (file)
@@ -2576,7 +2576,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
 
        txn->final_lsn = commit_lsn;
        txn->end_lsn = end_lsn;
-       txn->commit_time = commit_time;
+       txn->xact_time.commit_time = commit_time;
        txn->origin_id = origin_id;
        txn->origin_lsn = origin_lsn;
 
@@ -2667,7 +2667,7 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
         */
        txn->final_lsn = prepare_lsn;
        txn->end_lsn = end_lsn;
-       txn->commit_time = prepare_time;
+       txn->xact_time.prepare_time = prepare_time;
        txn->origin_id = origin_id;
        txn->origin_lsn = origin_lsn;
 
@@ -2714,7 +2714,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
        Assert(txn->final_lsn != InvalidXLogRecPtr);
 
        ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
-                                               txn->commit_time, txn->origin_id, txn->origin_lsn);
+                                               txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
 
        /*
         * We send the prepare for the concurrently aborted xacts so that later
@@ -2734,7 +2734,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
                                                        XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-                                                       XLogRecPtr initial_consistent_point,
+                                                       XLogRecPtr two_phase_at,
                                                        TimestampTz commit_time, RepOriginId origin_id,
                                                        XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2753,19 +2753,20 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
         * be later used for rollback.
         */
        prepare_end_lsn = txn->end_lsn;
-       prepare_time = txn->commit_time;
+       prepare_time = txn->xact_time.prepare_time;
 
        /* add the gid in the txn */
        txn->gid = pstrdup(gid);
 
        /*
         * It is possible that this transaction is not decoded at prepare time
-        * either because by that time we didn't have a consistent snapshot or it
-        * was decoded earlier but we have restarted. We only need to send the
-        * prepare if it was not decoded earlier. We don't need to decode the xact
-        * for aborts if it is not done already.
+        * either because by that time we didn't have a consistent snapshot, or
+        * two_phase was not enabled, or it was decoded earlier but we have
+        * restarted. We only need to send the prepare if it was not decoded
+        * earlier. We don't need to decode the xact for aborts if it is not done
+        * already.
         */
-       if ((txn->final_lsn < initial_consistent_point) && is_commit)
+       if ((txn->final_lsn < two_phase_at) && is_commit)
        {
                txn->txn_flags |= RBTXN_PREPARE;
 
@@ -2783,12 +2784,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
                 * prepared after the restart.
                 */
                ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
-                                                       txn->commit_time, txn->origin_id, txn->origin_lsn);
+                                                       txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
        }
 
        txn->final_lsn = commit_lsn;
        txn->end_lsn = end_lsn;
-       txn->commit_time = commit_time;
+       txn->xact_time.commit_time = commit_time;
        txn->origin_id = origin_id;
        txn->origin_lsn = origin_lsn;
 
index 04f3355f60270ae7688b4382fdae1b7a6f2c6d0d..a14a3d69005ec6b687ee6c481a9dc9731b7ebaa1 100644 (file)
@@ -165,15 +165,15 @@ struct SnapBuild
        XLogRecPtr      start_decoding_at;
 
        /*
-        * LSN at which we found a consistent point at the time of slot creation.
-        * This is also the point where we have exported a snapshot for the
-        * initial copy.
+        * LSN at which two-phase decoding was enabled or LSN at which we found a
+        * consistent point at the time of slot creation.
         *
-        * The prepared transactions that are not covered by initial snapshot
-        * needs to be sent later along with commit prepared and they must be
-        * before this point.
+        * The prepared transactions, that were skipped because previously
+        * two-phase was not enabled or are not covered by initial snapshot, need
+        * to be sent later along with commit prepared and they must be before
+        * this point.
         */
-       XLogRecPtr      initial_consistent_point;
+       XLogRecPtr      two_phase_at;
 
        /*
         * Don't start decoding WAL until the "xl_running_xacts" information
@@ -281,7 +281,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
                                                TransactionId xmin_horizon,
                                                XLogRecPtr start_lsn,
                                                bool need_full_snapshot,
-                                               XLogRecPtr initial_consistent_point)
+                                               XLogRecPtr two_phase_at)
 {
        MemoryContext context;
        MemoryContext oldcontext;
@@ -309,7 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
        builder->initial_xmin_horizon = xmin_horizon;
        builder->start_decoding_at = start_lsn;
        builder->building_full_snapshot = need_full_snapshot;
-       builder->initial_consistent_point = initial_consistent_point;
+       builder->two_phase_at = two_phase_at;
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -370,12 +370,21 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
- * Return the LSN at which the snapshot was exported
+ * Return the LSN at which the two-phase decoding was first enabled.
  */
 XLogRecPtr
-SnapBuildInitialConsistentPoint(SnapBuild *builder)
+SnapBuildGetTwoPhaseAt(SnapBuild *builder)
 {
-       return builder->initial_consistent_point;
+       return builder->two_phase_at;
+}
+
+/*
+ * Set the LSN at which two-phase decoding is enabled.
+ */
+void
+SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
+{
+       builder->two_phase_at = ptr;
 }
 
 /*
index 682c107e7479eb642ae74c184b27ec85a3e78bde..f07983a43cb3450d913ecc8d5d13a6e43f620efe 100644 (file)
@@ -96,6 +96,7 @@
 
 #include "access/table.h"
 #include "access/xact.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
+#include "utils/syscache.h"
 
 static bool table_states_valid = false;
+static List *table_states_not_ready = NIL;
+static bool FetchTableStates(bool *started_tx);
 
 StringInfo     copybuf = NULL;
 
@@ -362,7 +366,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                Oid                     relid;
                TimestampTz last_start_time;
        };
-       static List *table_states = NIL;
        static HTAB *last_start_times = NULL;
        ListCell   *lc;
        bool            started_tx = false;
@@ -370,42 +373,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        Assert(!IsTransactionState());
 
        /* We need up-to-date sync state info for subscription tables here. */
-       if (!table_states_valid)
-       {
-               MemoryContext oldctx;
-               List       *rstates;
-               ListCell   *lc;
-               SubscriptionRelState *rstate;
-
-               /* Clean the old list. */
-               list_free_deep(table_states);
-               table_states = NIL;
-
-               StartTransactionCommand();
-               started_tx = true;
-
-               /* Fetch all non-ready tables. */
-               rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
-
-               /* Allocate the tracking info in a permanent memory context. */
-               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-               foreach(lc, rstates)
-               {
-                       rstate = palloc(sizeof(SubscriptionRelState));
-                       memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-                       table_states = lappend(table_states, rstate);
-               }
-               MemoryContextSwitchTo(oldctx);
-
-               table_states_valid = true;
-       }
+       FetchTableStates(&started_tx);
 
        /*
         * Prepare a hash table for tracking last start times of workers, to avoid
         * immediate restarts.  We don't need it if there are no tables that need
         * syncing.
         */
-       if (table_states && !last_start_times)
+       if (table_states_not_ready && !last_start_times)
        {
                HASHCTL         ctl;
 
@@ -419,16 +394,38 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
         * Clean up the hash table when we're done with all tables (just to
         * release the bit of memory).
         */
-       else if (!table_states && last_start_times)
+       else if (!table_states_not_ready && last_start_times)
        {
                hash_destroy(last_start_times);
                last_start_times = NULL;
        }
 
+       /*
+        * Even when the two_phase mode is requested by the user, it remains as
+        * 'pending' until all tablesyncs have reached READY state.
+        *
+        * When this happens, we restart the apply worker and (if the conditions
+        * are still ok) then the two_phase tri-state will become 'enabled' at
+        * that time.
+        *
+        * Note: If the subscription has no tables then leave the state as
+        * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+        * work.
+        */
+       if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+               AllTablesyncsReady())
+       {
+               ereport(LOG,
+                               (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+                                               MySubscription->name)));
+
+               proc_exit(0);
+       }
+
        /*
         * Process all tables that are being synchronized.
         */
-       foreach(lc, table_states)
+       foreach(lc, table_states_not_ready)
        {
                SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
 
@@ -1071,7 +1068,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         * slot leading to a dangling slot on the server.
         */
        HOLD_INTERRUPTS();
-       walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
+       walrcv_create_slot(LogRepWorkerWalRcvConn,
+                                          slotname, false /* permanent */ , false /* two_phase */ ,
                                           CRS_USE_SNAPSHOT, origin_startpos);
        RESUME_INTERRUPTS();
 
@@ -1158,3 +1156,134 @@ copy_table_done:
        wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
        return slotname;
 }
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+static bool
+FetchTableStates(bool *started_tx)
+{
+       static bool has_subrels = false;
+
+       *started_tx = false;
+
+       if (!table_states_valid)
+       {
+               MemoryContext oldctx;
+               List       *rstates;
+               ListCell   *lc;
+               SubscriptionRelState *rstate;
+
+               /* Clean the old lists. */
+               list_free_deep(table_states_not_ready);
+               table_states_not_ready = NIL;
+
+               if (!IsTransactionState())
+               {
+                       StartTransactionCommand();
+                       *started_tx = true;
+               }
+
+               /* Fetch all non-ready tables. */
+               rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+
+               /* Allocate the tracking info in a permanent memory context. */
+               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+               foreach(lc, rstates)
+               {
+                       rstate = palloc(sizeof(SubscriptionRelState));
+                       memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+                       table_states_not_ready = lappend(table_states_not_ready, rstate);
+               }
+               MemoryContextSwitchTo(oldctx);
+
+               /*
+                * Does the subscription have tables?
+                *
+                * If there were not-READY relations found then we know it does. But
+                * if table_state_not_ready was empty we still need to check again to
+                * see if there are 0 tables.
+                */
+               has_subrels = (list_length(table_states_not_ready) > 0) ||
+                       HasSubscriptionRelations(MySubscription->oid);
+
+               table_states_valid = true;
+       }
+
+       return has_subrels;
+}
+
+/*
+ * If the subscription has no tables then return false.
+ *
+ * Otherwise, are all tablesyncs READY?
+ *
+ * Note: This function is not suitable to be called from outside of apply or
+ * tablesync workers because MySubscription needs to be already initialized.
+ */
+bool
+AllTablesyncsReady(void)
+{
+       bool            started_tx = false;
+       bool            has_subrels = false;
+
+       /* We need up-to-date sync state info for subscription tables here. */
+       has_subrels = FetchTableStates(&started_tx);
+
+       if (started_tx)
+       {
+               CommitTransactionCommand();
+               pgstat_report_stat(false);
+       }
+
+       /*
+        * Return false when there are no tables in subscription or not all tables
+        * are in ready state; true otherwise.
+        */
+       return has_subrels && list_length(table_states_not_ready) == 0;
+}
+
+/*
+ * Update the two_phase state of the specified subscription in pg_subscription.
+ */
+void
+UpdateTwoPhaseState(Oid suboid, char new_state)
+{
+       Relation        rel;
+       HeapTuple       tup;
+       bool            nulls[Natts_pg_subscription];
+       bool            replaces[Natts_pg_subscription];
+       Datum           values[Natts_pg_subscription];
+
+       Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
+                  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
+                  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
+
+       rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+       tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR,
+                        "cache lookup failed for subscription oid %u",
+                        suboid);
+
+       /* Form a new tuple. */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+       memset(replaces, false, sizeof(replaces));
+
+       /* And update/set two_phase state */
+       values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
+       replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+
+       tup = heap_modify_tuple(tup, RelationGetDescr(rel),
+                                                       values, nulls, replaces);
+       CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+       heap_freetuple(tup);
+       table_close(rel, RowExclusiveLock);
+}
index 5fc620c7f1900c1e7b210a5ab175b440492dcd6a..b9a7a7ffbb32d5dc2ce1fddb00e76c1bda7d3619 100644 (file)
  * a new way to pass filenames to BufFile APIs so that we are allowed to open
  * the file we desired across multiple stream-open calls for the same
  * transaction.
+ *
+ * TWO_PHASE TRANSACTIONS
+ * ----------------------
+ * Two phase transactions are replayed at prepare and then committed or
+ * rolled back at commit prepared and rollback prepared respectively. It is
+ * possible to have a prepared transaction that arrives at the apply worker
+ * when the tablesync is busy doing the initial copy. In this case, the apply
+ * worker skips all the prepared operations [e.g. inserts] while the tablesync
+ * is still busy (see the condition of should_apply_changes_for_rel). The
+ * tablesync worker might not get such a prepared transaction because say it
+ * was prior to the initial consistent point but might have got some later
+ * commits. Now, the tablesync worker will exit without doing anything for the
+ * prepared transaction skipped by the apply worker as the sync location for it
+ * will be already ahead of the apply worker's current location. This would lead
+ * to an "empty prepare", because later when the apply worker does the commit
+ * prepare, there is nothing in it (the inserts were skipped earlier).
+ *
+ * To avoid this, and similar prepare confusions the subscription's two_phase
+ * commit is enabled only after the initial sync is over. The two_phase option
+ * has been implemented as a tri-state with values DISABLED, PENDING, and
+ * ENABLED.
+ *
+ * Even if the user specifies they want a subscription with two_phase = on,
+ * internally it will start with a tri-state of PENDING which only becomes
+ * ENABLED after all tablesync initializations are completed - i.e. when all
+ * tablesync workers have reached their READY state. In other words, the value
+ * PENDING is only a temporary state for subscription start-up.
+ *
+ * Until the two_phase is properly available (ENABLED) the subscription will
+ * behave as if two_phase = off. When the apply worker detects that all
+ * tablesyncs have become READY (while the tri-state was PENDING) it will
+ * restart the apply worker process. This happens in
+ * process_syncing_tables_for_apply.
+ *
+ * When the (re-started) apply worker finds that all tablesyncs are READY for a
+ * two_phase tri-state of PENDING it start streaming messages with the
+ * two_phase option which in turn enables the decoding of two-phase commits at
+ * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
+ * Now, it is possible that during the time we have not enabled two_phase, the
+ * publisher (replication server) would have skipped some prepares but we
+ * ensure that such prepares are sent along with commit prepare, see
+ * ReorderBufferFinishPrepared.
+ *
+ * If the subscription has no tables then a two_phase tri-state PENDING is
+ * left unchanged. This lets the user still do an ALTER TABLE REFRESH
+ * PUBLICATION which might otherwise be disallowed (see below).
+ *
+ * If ever a user needs to be aware of the tri-state value, they can fetch it
+ * from the pg_subscription catalog (see column subtwophasestate).
+ *
+ * We don't allow to toggle two_phase option of a subscription because it can
+ * lead to an inconsistent replica. Consider, initially, it was on and we have
+ * received some prepare then we turn it off, now at commit time the server
+ * will send the entire transaction data along with the commit. With some more
+ * analysis, we can allow changing this option from off to on but not sure if
+ * that alone would be useful.
+ *
+ * Finally, to avoid problems mentioned in previous paragraphs from any
+ * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
+ * to 'off' and then again back to 'on') there is a restriction for
+ * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
+ * the two_phase tri-state is ENABLED, except when copy_data = false.
+ *
+ * We can get prepare of the same GID more than once for the genuine cases
+ * where we have defined multiple subscriptions for publications on the same
+ * server and prepared transaction has operations on tables subscribed to those
+ * subscriptions. For such cases, if we use the GID sent by publisher one of
+ * the prepares will be successful and others will fail, in which case the
+ * server will send them again. Now, this can lead to a deadlock if user has
+ * set synchronous_standby_names for all the subscriptions on subscriber. To
+ * avoid such deadlocks, we generate a unique GID (consisting of the
+ * subscription oid and the xid of the prepared transaction) for each prepare
+ * transaction on the subscriber.
  *-------------------------------------------------------------------------
  */
 
 
 #include "access/table.h"
 #include "access/tableam.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
@@ -256,6 +330,10 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                                           LogicalRepTupleData *newtup,
                                                                           CmdType operation);
 
+/* Compute GID for two_phase transactions */
+static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -783,6 +861,185 @@ apply_handle_commit(StringInfo s)
        pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+/*
+ * Handle BEGIN PREPARE message.
+ */
+static void
+apply_handle_begin_prepare(StringInfo s)
+{
+       LogicalRepPreparedTxnData begin_data;
+
+       /* Tablesync should never receive prepare. */
+       if (am_tablesync_worker())
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+
+       logicalrep_read_begin_prepare(s, &begin_data);
+
+       remote_final_lsn = begin_data.prepare_lsn;
+
+       in_remote_transaction = true;
+
+       pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+       LogicalRepPreparedTxnData prepare_data;
+       char            gid[GIDSIZE];
+
+       logicalrep_read_prepare(s, &prepare_data);
+
+       if (prepare_data.prepare_lsn != remote_final_lsn)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+                                                                LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
+                                                                LSN_FORMAT_ARGS(remote_final_lsn))));
+
+       /*
+        * Compute unique GID for two_phase transactions. We don't use GID of
+        * prepared transaction sent by server as that can lead to deadlock when
+        * we have multiple subscriptions from same node point to publications on
+        * the same node. See comments atop worker.c
+        */
+       TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+                                                  gid, sizeof(gid));
+
+       /*
+        * Unlike commit, here, we always prepare the transaction even though no
+        * change has happened in this transaction. It is done this way because at
+        * commit prepared time, we won't know whether we have skipped preparing a
+        * transaction because of no change.
+        *
+        * XXX, We can optimize such that at commit prepared time, we first check
+        * whether we have prepared the transaction or not but that doesn't seem
+        * worthwhile because such cases shouldn't be common.
+        */
+       begin_replication_step();
+
+       /*
+        * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+        * called within the PrepareTransactionBlock below.
+        */
+       BeginTransactionBlock();
+       CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+       /*
+        * Update origin state so we can restart streaming from correct position
+        * in case of crash.
+        */
+       replorigin_session_origin_lsn = prepare_data.end_lsn;
+       replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+       PrepareTransactionBlock(gid);
+       end_replication_step();
+       CommitTransactionCommand();
+       pgstat_report_stat(false);
+
+       store_flush_position(prepare_data.end_lsn);
+
+       in_remote_transaction = false;
+
+       /* Process any tables that are being synchronized in parallel. */
+       process_syncing_tables(prepare_data.end_lsn);
+
+       pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ */
+static void
+apply_handle_commit_prepared(StringInfo s)
+{
+       LogicalRepCommitPreparedTxnData prepare_data;
+       char            gid[GIDSIZE];
+
+       logicalrep_read_commit_prepared(s, &prepare_data);
+
+       /* Compute GID for two_phase transactions. */
+       TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+                                                  gid, sizeof(gid));
+
+       /* There is no transaction when COMMIT PREPARED is called */
+       begin_replication_step();
+
+       /*
+        * Update origin state so we can restart streaming from correct position
+        * in case of crash.
+        */
+       replorigin_session_origin_lsn = prepare_data.end_lsn;
+       replorigin_session_origin_timestamp = prepare_data.commit_time;
+
+       FinishPreparedTransaction(gid, true);
+       end_replication_step();
+       CommitTransactionCommand();
+       pgstat_report_stat(false);
+
+       store_flush_position(prepare_data.end_lsn);
+       in_remote_transaction = false;
+
+       /* Process any tables that are being synchronized in parallel. */
+       process_syncing_tables(prepare_data.end_lsn);
+
+       pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ */
+static void
+apply_handle_rollback_prepared(StringInfo s)
+{
+       LogicalRepRollbackPreparedTxnData rollback_data;
+       char            gid[GIDSIZE];
+
+       logicalrep_read_rollback_prepared(s, &rollback_data);
+
+       /* Compute GID for two_phase transactions. */
+       TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
+                                                  gid, sizeof(gid));
+
+       /*
+        * It is possible that we haven't received prepare because it occurred
+        * before walsender reached a consistent point or the two_phase was still
+        * not enabled by that time, so in such cases, we need to skip rollback
+        * prepared.
+        */
+       if (LookupGXact(gid, rollback_data.prepare_end_lsn,
+                                       rollback_data.prepare_time))
+       {
+               /*
+                * Update origin state so we can restart streaming from correct
+                * position in case of crash.
+                */
+               replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
+               replorigin_session_origin_timestamp = rollback_data.rollback_time;
+
+               /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
+               begin_replication_step();
+               FinishPreparedTransaction(gid, false);
+               end_replication_step();
+               CommitTransactionCommand();
+       }
+
+       pgstat_report_stat(false);
+
+       store_flush_position(rollback_data.rollback_end_lsn);
+       in_remote_transaction = false;
+
+       /* Process any tables that are being synchronized in parallel. */
+       process_syncing_tables(rollback_data.rollback_end_lsn);
+
+       pgstat_report_activity(STATE_IDLE, NULL);
+}
+
 /*
  * Handle ORIGIN message.
  *
@@ -2060,6 +2317,22 @@ apply_dispatch(StringInfo s)
                case LOGICAL_REP_MSG_STREAM_COMMIT:
                        apply_handle_stream_commit(s);
                        return;
+
+               case LOGICAL_REP_MSG_BEGIN_PREPARE:
+                       apply_handle_begin_prepare(s);
+                       return;
+
+               case LOGICAL_REP_MSG_PREPARE:
+                       apply_handle_prepare(s);
+                       return;
+
+               case LOGICAL_REP_MSG_COMMIT_PREPARED:
+                       apply_handle_commit_prepared(s);
+                       return;
+
+               case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+                       apply_handle_rollback_prepared(s);
+                       return;
        }
 
        ereport(ERROR,
@@ -2539,6 +2812,9 @@ maybe_reread_subscription(void)
        /* !slotname should never happen when enabled is true. */
        Assert(newsub->slotname);
 
+       /* two-phase should not be altered */
+       Assert(newsub->twophasestate == MySubscription->twophasestate);
+
        /*
         * Exit if any parameter that affects the remote connection was changed.
         * The launcher will start a new worker.
@@ -3040,6 +3316,24 @@ cleanup_subxact_info()
        subxact_data.nsubxacts_max = 0;
 }
 
+/*
+ * Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+static void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
+{
+       Assert(subid != InvalidRepOriginId);
+
+       if (!TransactionIdIsValid(xid))
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg_internal("invalid two-phase transaction ID")));
+
+       snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3050,6 +3344,7 @@ ApplyWorkerMain(Datum main_arg)
        XLogRecPtr      origin_startpos;
        char       *myslotname;
        WalRcvStreamOptions options;
+       int                     server_version;
 
        /* Attach to slot */
        logicalrep_worker_attach(worker_slot);
@@ -3208,15 +3503,59 @@ ApplyWorkerMain(Datum main_arg)
        options.logical = true;
        options.startpoint = origin_startpos;
        options.slotname = myslotname;
+
+       server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
        options.proto.logical.proto_version =
-               walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
-               LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
+               server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+               server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+               LOGICALREP_PROTO_VERSION_NUM;
+
        options.proto.logical.publication_names = MySubscription->publications;
        options.proto.logical.binary = MySubscription->binary;
        options.proto.logical.streaming = MySubscription->stream;
+       options.proto.logical.twophase = false;
+
+       if (!am_tablesync_worker())
+       {
+               /*
+                * Even when the two_phase mode is requested by the user, it remains
+                * as the tri-state PENDING until all tablesyncs have reached READY
+                * state. Only then, can it become ENABLED.
+                *
+                * Note: If the subscription has no tables then leave the state as
+                * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+                * work.
+                */
+               if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+                       AllTablesyncsReady())
+               {
+                       /* Start streaming with two_phase enabled */
+                       options.proto.logical.twophase = true;
+                       walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
-       /* Start normal logical streaming replication. */
-       walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+                       StartTransactionCommand();
+                       UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+                       MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+                       CommitTransactionCommand();
+               }
+               else
+               {
+                       walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+               }
+
+               ereport(DEBUG1,
+                               (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
+                                               MySubscription->name,
+                                               MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+                                               MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+                                               MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+                                               "?")));
+       }
+       else
+       {
+               /* Start normal logical streaming replication. */
+               walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+       }
 
        /* Run the main loop. */
        LogicalRepApplyLoop(origin_startpos);
index abd5217ab1b5ee8e23eff86025f33aca41e63e9d..e4314af13ae6c22b2647b4cc3493bf4016daa3d4 100644 (file)
@@ -51,6 +51,16 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
                                                         Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                                                   RepOriginId origin_id);
+static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
+                                                                          ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+                                                                ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+                                                                                ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                                                                                  ReorderBufferTXN *txn,
+                                                                                  XLogRecPtr prepare_end_lsn,
+                                                                                  TimestampTz prepare_time);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
                                                                  ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
@@ -70,6 +80,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
                                                                                uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
                                                                        LogicalDecodingContext *ctx);
+static void send_repl_origin(LogicalDecodingContext *ctx,
+                                                        RepOriginId origin_id, XLogRecPtr origin_lsn,
+                                                        bool send_origin);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -145,6 +158,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->truncate_cb = pgoutput_truncate;
        cb->message_cb = pgoutput_message;
        cb->commit_cb = pgoutput_commit_txn;
+
+       cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
+       cb->prepare_cb = pgoutput_prepare_txn;
+       cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+       cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
        cb->filter_by_origin_cb = pgoutput_origin_filter;
        cb->shutdown_cb = pgoutput_shutdown;
 
@@ -156,6 +174,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->stream_change_cb = pgoutput_change;
        cb->stream_message_cb = pgoutput_message;
        cb->stream_truncate_cb = pgoutput_truncate;
+       /* transaction streaming - two-phase commit */
+       cb->stream_prepare_cb = NULL;
 }
 
 static void
@@ -167,10 +187,12 @@ parse_output_parameters(List *options, PGOutputData *data)
        bool            binary_option_given = false;
        bool            messages_option_given = false;
        bool            streaming_given = false;
+       bool            two_phase_option_given = false;
 
        data->binary = false;
        data->streaming = false;
        data->messages = false;
+       data->two_phase = false;
 
        foreach(lc, options)
        {
@@ -246,8 +268,29 @@ parse_output_parameters(List *options, PGOutputData *data)
 
                        data->streaming = defGetBoolean(defel);
                }
+               else if (strcmp(defel->defname, "two_phase") == 0)
+               {
+                       if (two_phase_option_given)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       two_phase_option_given = true;
+
+                       data->two_phase = defGetBoolean(defel);
+               }
                else
                        elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
+
+               /*
+                * Do additional checking for the disallowed combination of two_phase
+                * and streaming. While streaming and two_phase can theoretically be
+                * supported, it needs more analysis to allow them together.
+                */
+               if (data->two_phase && data->streaming)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("%s and %s are mutually exclusive options",
+                                                       "two_phase", "streaming")));
        }
 }
 
@@ -319,6 +362,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                /* Also remember we're currently not streaming any transaction. */
                in_streaming = false;
 
+               /*
+                * Here, we just check whether the two-phase option is passed by
+                * plugin and decide whether to enable it at later point of time. It
+                * remains enabled if the previous start-up has done so. But we only
+                * allow the option to be passed in with sufficient version of the
+                * protocol, and when the output plugin supports it.
+                */
+               if (!data->two_phase)
+                       ctx->twophase_opt_given = false;
+               else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
+                                                       data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+               else if (!ctx->twophase)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("two-phase commit requested, but not supported by output plugin")));
+               else
+                       ctx->twophase_opt_given = true;
+
                /* Init publication state. */
                data->publications = NIL;
                publications_valid = false;
@@ -331,8 +395,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        }
        else
        {
-               /* Disable the streaming during the slot initialization mode. */
+               /*
+                * Disable the streaming and prepared transactions during the slot
+                * initialization mode.
+                */
                ctx->streaming = false;
+               ctx->twophase = false;
        }
 }
 
@@ -347,29 +415,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
        OutputPluginPrepareWrite(ctx, !send_replication_origin);
        logicalrep_write_begin(ctx->out, txn);
 
-       if (send_replication_origin)
-       {
-               char       *origin;
-
-               /*----------
-                * XXX: which behaviour do we want here?
-                *
-                * Alternatives:
-                *      - don't send origin message if origin name not found
-                *        (that's what we do now)
-                *      - throw error - that will break replication, not good
-                *      - send some special "unknown" origin
-                *----------
-                */
-               if (replorigin_by_oid(txn->origin_id, true, &origin))
-               {
-                       /* Message boundary */
-                       OutputPluginWrite(ctx, false);
-                       OutputPluginPrepareWrite(ctx, true);
-                       logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
-               }
-
-       }
+       send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+                                        send_replication_origin);
 
        OutputPluginWrite(ctx, true);
 }
@@ -388,6 +435,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        OutputPluginWrite(ctx, true);
 }
 
+/*
+ * BEGIN PREPARE callback
+ */
+static void
+pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+       bool            send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+       OutputPluginPrepareWrite(ctx, !send_replication_origin);
+       logicalrep_write_begin_prepare(ctx->out, txn);
+
+       send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+                                        send_replication_origin);
+
+       OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_lsn)
+{
+       OutputPluginUpdateProgress(ctx);
+
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+       OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                                                        XLogRecPtr commit_lsn)
+{
+       OutputPluginUpdateProgress(ctx);
+
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
+       OutputPluginWrite(ctx, true);
+}
+
+/*
+ * ROLLBACK PREPARED callback
+ */
+static void
+pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+                                                          ReorderBufferTXN *txn,
+                                                          XLogRecPtr prepare_end_lsn,
+                                                          TimestampTz prepare_time)
+{
+       OutputPluginUpdateProgress(ctx);
+
+       OutputPluginPrepareWrite(ctx, true);
+       logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
+                                                                          prepare_time);
+       OutputPluginWrite(ctx, true);
+}
+
 /*
  * Write the current schema of the relation and its ancestor (if any) if not
  * done yet.
@@ -839,18 +948,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
        OutputPluginPrepareWrite(ctx, !send_replication_origin);
        logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
 
-       if (send_replication_origin)
-       {
-               char       *origin;
-
-               if (replorigin_by_oid(txn->origin_id, true, &origin))
-               {
-                       /* Message boundary */
-                       OutputPluginWrite(ctx, false);
-                       OutputPluginPrepareWrite(ctx, true);
-                       logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
-               }
-       }
+       send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
+                                        send_replication_origin);
 
        OutputPluginWrite(ctx, true);
 
@@ -1270,3 +1369,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
                entry->pubactions.pubtruncate = false;
        }
 }
+
+/* Send Replication origin */
+static void
+send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
+                                XLogRecPtr origin_lsn, bool send_origin)
+{
+       if (send_origin)
+       {
+               char       *origin;
+
+               /*----------
+                * XXX: which behaviour do we want here?
+                *
+                * Alternatives:
+                *  - don't send origin message if origin name not found
+                *    (that's what we do now)
+                *  - throw error - that will break replication, not good
+                *  - send some special "unknown" origin
+                *----------
+                */
+               if (replorigin_by_oid(origin_id, true, &origin))
+               {
+                       /* Message boundary */
+                       OutputPluginWrite(ctx, false);
+                       OutputPluginPrepareWrite(ctx, true);
+
+                       logicalrep_write_origin(ctx->out, origin, origin_lsn);
+               }
+       }
+}
index 8c18b4ed05b5669793e492eeb25c5c1caabe711f..33b85d86cc87eb650e17767f0e5220eb1221b0d4 100644 (file)
@@ -283,6 +283,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
        slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
        slot->data.persistency = persistency;
        slot->data.two_phase = two_phase;
+       slot->data.two_phase_at = InvalidXLogRecPtr;
 
        /* and then data only present in shared memory */
        slot->just_dirtied = false;
index 2be9ad967dc20aa6311943415be47d60ea4f1fd2..9a2bc37fd711b74c211f5db512880d8742950ce1 100644 (file)
@@ -370,7 +370,7 @@ WalReceiverMain(void)
                                         "pg_walreceiver_%lld",
                                         (long long int) walrcv_get_backend_pid(wrconn));
 
-                       walrcv_create_slot(wrconn, slotname, true, 0, NULL);
+                       walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
 
                        SpinLockAcquire(&walrcv->mutex);
                        strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
index 321152151dbb944e6c46f229f781e1b71ee5fa69..912144c43e34a6e17604b27c9e0049910fdcbda5 100644 (file)
@@ -51,6 +51,7 @@
 #include "catalog/pg_largeobject_d.h"
 #include "catalog/pg_largeobject_metadata_d.h"
 #include "catalog/pg_proc_d.h"
+#include "catalog/pg_subscription.h"
 #include "catalog/pg_trigger_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
@@ -4320,6 +4321,7 @@ getSubscriptions(Archive *fout)
        int                     i_subname;
        int                     i_rolname;
        int                     i_substream;
+       int                     i_subtwophasestate;
        int                     i_subconninfo;
        int                     i_subslotname;
        int                     i_subsynccommit;
@@ -4363,9 +4365,16 @@ getSubscriptions(Archive *fout)
                appendPQExpBufferStr(query, " false AS subbinary,\n");
 
        if (fout->remoteVersion >= 140000)
-               appendPQExpBufferStr(query, " s.substream\n");
+               appendPQExpBufferStr(query, " s.substream,\n");
        else
-               appendPQExpBufferStr(query, " false AS substream\n");
+               appendPQExpBufferStr(query, " false AS substream,\n");
+
+       if (fout->remoteVersion >= 150000)
+               appendPQExpBufferStr(query, " s.subtwophasestate\n");
+       else
+               appendPQExpBuffer(query,
+                                                 " '%c' AS subtwophasestate\n",
+                                                 LOGICALREP_TWOPHASE_STATE_DISABLED);
 
        appendPQExpBufferStr(query,
                                                 "FROM pg_subscription s\n"
@@ -4386,6 +4395,7 @@ getSubscriptions(Archive *fout)
        i_subpublications = PQfnumber(res, "subpublications");
        i_subbinary = PQfnumber(res, "subbinary");
        i_substream = PQfnumber(res, "substream");
+       i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 
        subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4411,6 +4421,8 @@ getSubscriptions(Archive *fout)
                        pg_strdup(PQgetvalue(res, i, i_subbinary));
                subinfo[i].substream =
                        pg_strdup(PQgetvalue(res, i, i_substream));
+               subinfo[i].subtwophasestate =
+                       pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
 
                if (strlen(subinfo[i].rolname) == 0)
                        pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@@ -4438,6 +4450,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
        char      **pubnames = NULL;
        int                     npubnames = 0;
        int                     i;
+       char            two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
 
        if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
                return;
@@ -4479,6 +4492,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
        if (strcmp(subinfo->substream, "f") != 0)
                appendPQExpBufferStr(query, ", streaming = on");
 
+       if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+               appendPQExpBufferStr(query, ", two_phase = on");
+
        if (strcmp(subinfo->subsynccommit, "off") != 0)
                appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
index ba9bc6ddd210a3e68d8f8f2ed1e1ff0566f459b7..efb8c30e71994efac2db3d01e2beae94dbc870b3 100644 (file)
@@ -639,6 +639,7 @@ typedef struct _SubscriptionInfo
        char       *subslotname;
        char       *subbinary;
        char       *substream;
+       char       *subtwophasestate;
        char       *subsynccommit;
        char       *subpublications;
 } SubscriptionInfo;
index 2abf255798c0d20953d1c15f008de9f8c16be845..ba658f731b0878a6405e45a320f1a88d88c3a7e7 100644 (file)
@@ -6389,7 +6389,7 @@ describeSubscriptions(const char *pattern, bool verbose)
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
        static const bool translate_columns[] = {false, false, false, false,
-       false, false, false, false};
+       false, false, false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -6423,6 +6423,12 @@ describeSubscriptions(const char *pattern, bool verbose)
                                                          gettext_noop("Binary"),
                                                          gettext_noop("Streaming"));
 
+               /* Two_phase is only supported in v15 and higher */
+               if (pset.sversion >= 150000)
+                       appendPQExpBuffer(&buf,
+                                                         ", subtwophasestate AS \"%s\"\n",
+                                                         gettext_noop("Two phase commit"));
+
                appendPQExpBuffer(&buf,
                                                  ",  subsynccommit AS \"%s\"\n"
                                                  ",  subconninfo AS \"%s\"\n",
index 0ebd5aa41a1ada838a3deb145f0afbae09f28d9d..d6bf725971cdf2d92a385c13c420d53c10119728 100644 (file)
@@ -2764,7 +2764,7 @@ psql_completion(const char *text, int start, int end)
        else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
                COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
                                          "enabled", "slot_name", "streaming",
-                                         "synchronous_commit");
+                                         "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
index 91786da784273bdc743e587cd4cac550f9a9bd65..e27e1a8fe8a0d47428b27b5a532f0fffa37dac64 100644 (file)
@@ -58,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
                                                   XLogRecPtr end_lsn, RepOriginId origin_id);
 extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
+extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn,
+                                               TimestampTz origin_prepare_timestamp);
 #endif                                                 /* TWOPHASE_H */
index e92ecaf34485fe89581b41457e67614db49279ce..f2ecafa1daac4067ce312cd9ef1bd05ed8b01e7f 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202107071
+#define CATALOG_VERSION_NO     202107141
 
 #endif
index 750d46912a570dbc08ac8e94b7053201160b7c98..21061493ea2700b48ffae59ea99f7418ee75d904 100644 (file)
 
 #include "nodes/pg_list.h"
 
+/*
+ * two_phase tri-state values. See comments atop worker.c to know more about
+ * these states.
+ */
+#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
+#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
+#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+
 /* ----------------
  *             pg_subscription definition. cpp turns this into
  *             typedef struct FormData_pg_subscription
@@ -57,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
        bool            substream;              /* Stream in-progress transactions. */
 
+       char            subtwophasestate;       /* Stream two-phase transactions */
+
 #ifdef CATALOG_VARLEN                  /* variable-length fields start here */
        /* Connection string to the publisher */
        text            subconninfo BKI_FORCE_NOT_NULL;
@@ -92,6 +102,7 @@ typedef struct Subscription
        bool            binary;                 /* Indicates if the subscription wants data in
                                                                 * binary format */
        bool            stream;                 /* Allow streaming in-progress transactions. */
+       char            twophasestate;  /* Allow streaming two-phase transactions */
        char       *conninfo;           /* Connection string to the publisher */
        char       *slotname;           /* Name of the replication slot */
        char       *synccommit;         /* Synchronous commit setting for worker */
index 4d2056318db2d168da9e170d8a81614a49d37cd4..632381b4e3a5c388e4cfda5de0ab9d8e10f0c775 100644 (file)
@@ -87,6 +87,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
+extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionNotReadyRelations(Oid subid);
 
index af551d6f4eea8ff8c81de74f71f0db85435f9733..e0f513b77384fef9449c3f890a1e09f9b7d4288e 100644 (file)
@@ -89,6 +89,16 @@ typedef struct LogicalDecodingContext
         */
        bool            twophase;
 
+       /*
+        * Is two-phase option given by output plugin?
+        *
+        * This flag indicates that the plugin passed in the two-phase option as
+        * part of the START_STREAMING command. We can't rely solely on the
+        * twophase flag which only tells whether the plugin provided all the
+        * necessary two-phase callbacks.
+        */
+       bool            twophase_opt_given;
+
        /*
         * State for writing output.
         */
index 55b90c03eacec107b6c94332f9b85140243d4f75..63de90d94a5724ac619b194778c9ba97ff8e7abf 100644 (file)
@@ -13,6 +13,7 @@
 #ifndef LOGICAL_PROTO_H
 #define LOGICAL_PROTO_H
 
+#include "access/xact.h"
 #include "replication/reorderbuffer.h"
 #include "utils/rel.h"
 
  * connect time.
  *
  * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
- * support for streaming large transactions.
+ * support for streaming large transactions. Introduced in PG14.
+ *
+ * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
+ * support for two-phase commit decoding (at prepare time). Introduced in PG15.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
+#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
 
 /*
  * Logical message types
@@ -55,6 +60,10 @@ typedef enum LogicalRepMsgType
        LOGICAL_REP_MSG_RELATION = 'R',
        LOGICAL_REP_MSG_TYPE = 'Y',
        LOGICAL_REP_MSG_MESSAGE = 'M',
+       LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
+       LOGICAL_REP_MSG_PREPARE = 'P',
+       LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
+       LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r',
        LOGICAL_REP_MSG_STREAM_START = 'S',
        LOGICAL_REP_MSG_STREAM_END = 'E',
        LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -122,6 +131,48 @@ typedef struct LogicalRepCommitData
        TimestampTz committime;
 } LogicalRepCommitData;
 
+/*
+ * Prepared transaction protocol information for begin_prepare, and prepare.
+ */
+typedef struct LogicalRepPreparedTxnData
+{
+       XLogRecPtr      prepare_lsn;
+       XLogRecPtr      end_lsn;
+       TimestampTz prepare_time;
+       TransactionId xid;
+       char            gid[GIDSIZE];
+} LogicalRepPreparedTxnData;
+
+/*
+ * Prepared transaction protocol information for commit prepared.
+ */
+typedef struct LogicalRepCommitPreparedTxnData
+{
+       XLogRecPtr      commit_lsn;
+       XLogRecPtr      end_lsn;
+       TimestampTz commit_time;
+       TransactionId xid;
+       char            gid[GIDSIZE];
+} LogicalRepCommitPreparedTxnData;
+
+/*
+ * Rollback Prepared transaction protocol information. The prepare information
+ * prepare_end_lsn and prepare_time are used to check if the downstream has
+ * received this prepared transaction in which case it can apply the rollback,
+ * otherwise, it can skip the rollback operation. The gid alone is not
+ * sufficient because the downstream node can have a prepared transaction with
+ * same identifier.
+ */
+typedef struct LogicalRepRollbackPreparedTxnData
+{
+       XLogRecPtr      prepare_end_lsn;
+       XLogRecPtr      rollback_end_lsn;
+       TimestampTz prepare_time;
+       TimestampTz rollback_time;
+       TransactionId xid;
+       char            gid[GIDSIZE];
+} LogicalRepRollbackPreparedTxnData;
+
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
 extern void logicalrep_read_begin(StringInfo in,
                                                                  LogicalRepBeginData *begin_data);
@@ -129,6 +180,24 @@ extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
                                                                        XLogRecPtr commit_lsn);
 extern void logicalrep_read_commit(StringInfo in,
                                                                   LogicalRepCommitData *commit_data);
+extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn);
+extern void logicalrep_read_begin_prepare(StringInfo in,
+                                                                                 LogicalRepPreparedTxnData *begin_data);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+                                                                        XLogRecPtr prepare_lsn);
+extern void logicalrep_read_prepare(StringInfo in,
+                                                                       LogicalRepPreparedTxnData *prepare_data);
+extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+                                                                                        XLogRecPtr commit_lsn);
+extern void logicalrep_read_commit_prepared(StringInfo in,
+                                                                                       LogicalRepCommitPreparedTxnData *prepare_data);
+extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+                                                                                          XLogRecPtr prepare_end_lsn,
+                                                                                          TimestampTz prepare_time);
+extern void logicalrep_read_rollback_prepared(StringInfo in,
+                                                                                         LogicalRepRollbackPreparedTxnData *rollback_data);
+
+
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
                                                                        XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
index 51e7c0348da3c0f3d26bed3162af378dbee1b9d3..0dc460fb70a56c3fabe84911021b7e4afb7bf391 100644 (file)
@@ -27,6 +27,7 @@ typedef struct PGOutputData
        bool            binary;
        bool            streaming;
        bool            messages;
+       bool            two_phase;
 } PGOutputData;
 
 #endif                                                 /* PGOUTPUT_H */
index ba257d81b511aabf32d974cf5198ddd4372de595..5b40ff75f796fcd54f59316fc8dfb2bd53a82b60 100644 (file)
@@ -297,7 +297,11 @@ typedef struct ReorderBufferTXN
         * Commit or Prepare time, only known when we read the actual commit or
         * prepare record.
         */
-       TimestampTz commit_time;
+       union
+       {
+               TimestampTz commit_time;
+               TimestampTz prepare_time;
+       }                       xact_time;
 
        /*
         * The base snapshot is used to decode all changes until either this
@@ -636,7 +640,7 @@ void                ReorderBufferCommit(ReorderBuffer *, TransactionId,
                                                                TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void           ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
                                                                                XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-                                                                               XLogRecPtr initial_consistent_point,
+                                                                               XLogRecPtr two_phase_at,
                                                                                TimestampTz commit_time,
                                                                                RepOriginId origin_id, XLogRecPtr origin_lsn,
                                                                                char *gid, bool is_commit);
index 2eb7e3a530d438a5c6d10d15dc3029b80e58c4f6..34d95eac8e909b3f619ed7c0a8dba3e615325a7c 100644 (file)
@@ -84,11 +84,10 @@ typedef struct ReplicationSlotPersistentData
        XLogRecPtr      confirmed_flush;
 
        /*
-        * LSN at which we found a consistent point at the time of slot creation.
-        * This is also the point where we have exported a snapshot for the
-        * initial copy.
+        * LSN at which we enabled two_phase commit for this slot or LSN at which
+        * we found a consistent point at the time of slot creation.
         */
-       XLogRecPtr      initial_consistent_point;
+       XLogRecPtr      two_phase_at;
 
        /*
         * Allow decoding of prepared transactions?
index fbabce6764d31e54b26f15af099f21e002e38340..de7212464af55796c06a4e76e8a2e4334cced1f3 100644 (file)
@@ -62,7 +62,7 @@ extern void CheckPointSnapBuild(void);
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
                                                                                  TransactionId xmin_horizon, XLogRecPtr start_lsn,
                                                                                  bool need_full_snapshot,
-                                                                                 XLogRecPtr initial_consistent_point);
+                                                                                 XLogRecPtr two_phase_at);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -76,7 +76,8 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
                                                                                        TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
-extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
+extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder);
+extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
                                                           TransactionId xid, int nsubxacts,
index 4fd7c25ea74e88f01779f5c5798476e780f43e90..0b607ed777b7c045ba25404315b303023c65b20e 100644 (file)
@@ -181,6 +181,8 @@ typedef struct
                        List       *publication_names;  /* String list of publications */
                        bool            binary; /* Ask publisher to use binary */
                        bool            streaming;      /* Streaming of large transactions */
+                       bool            twophase;       /* Streaming of two-phase transactions at
+                                                                        * prepare time */
                }                       logical;
        }                       proto;
 } WalRcvStreamOptions;
@@ -347,6 +349,7 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
 typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
                                                                                const char *slotname,
                                                                                bool temporary,
+                                                                               bool two_phase,
                                                                                CRSSnapshotAction snapshot_action,
                                                                                XLogRecPtr *lsn);
 
@@ -420,8 +423,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
        WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
        WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
-       WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
+       WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
 #define walrcv_get_backend_pid(conn) \
        WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
index 179eb43900d512f38789a9b98208dee6bd8d62f6..41c7487393f9c995e6f42029bf15364d44780c7f 100644 (file)
@@ -86,6 +86,9 @@ extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
                                                                                          char *originname, int szorgname);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 
+extern bool AllTablesyncsReady(void);
+extern void UpdateTwoPhaseState(Oid suboid, char new_state);
+
 void           process_syncing_tables(XLogRecPtr current_lsn);
 void           invalidate_syncing_table_states(Datum arg, int cacheid,
                                                                                        uint32 hashvalue);
index 57f7dd9b0a7351dedc5ac6c69b8471775a981515..ad6b4e4bd35fa661fd2a2e3ffba5555b08ceb06e 100644 (file)
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,10 +91,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | off                | dbname=regress_doesnotexist2
+                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                  List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | local              | dbname=regress_doesnotexist2
+                                                                            List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -162,19 +162,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -185,19 +185,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -212,10 +212,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                    List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | off                | dbname=regress_doesnotexist
+                                                                             List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  unrecognized subscription parameter: "copy_data"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                            List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -263,6 +263,43 @@ ALTER SUBSCRIPTION regress_testsub DISABLE;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 DROP FUNCTION func;
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+ERROR:  two_phase requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+(1 row)
+
+--fail - alter of two_phase option not supported.
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+ERROR:  unrecognized subscription parameter: "two_phase"
+--fail - cannot set streaming when two_phase enabled
+ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
+ERROR:  cannot set streaming = true for two-phase enabled subscription
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail - two_phase and streaming are mutually exclusive.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+ERROR:  two_phase = true and streaming = true are mutually exclusive options
+\dRs+
+                                            List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo 
+------+-------+---------+-------------+--------+-----------+------------------+--------------------+----------
+(0 rows)
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
index 308c098c144ac9d2a59d14a5757d5732d221280d..b7328714072114873e8e44a8326d694b4cc60e24 100644 (file)
@@ -202,6 +202,31 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 DROP FUNCTION func;
 
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+
+\dRs+
+--fail - alter of two_phase option not supported.
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+
+--fail - cannot set streaming when two_phase enabled
+ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
+-- fail - two_phase and streaming are mutually exclusive.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+
+\dRs+
+
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
new file mode 100644 (file)
index 0000000..c6ada92
--- /dev/null
@@ -0,0 +1,359 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 24;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+       qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+       qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full SELECT generate_series(1,10);
+       PREPARE TRANSACTION 'some_initial_data';
+       COMMIT PREPARED 'some_initial_data';");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub FOR TABLE tab_full");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',        "
+       CREATE SUBSCRIPTION tap_sub
+       CONNECTION '$publisher_connstr application_name=$appname'
+       PUBLICATION tap_pub
+       WITH (two_phase = on)");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+       "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+       "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then COMMIT PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (11);
+       PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then ROLLBACK PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres',"
+       BEGIN;
+       INSERT INTO tab_full VALUES (12);
+       PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that ROLLBACK PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(0), 'Rows rolled back are not on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (subscriber only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (14);
+    INSERT INTO tab_full VALUES (15);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (14,15);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (16);
+    INSERT INTO tab_full VALUES (17);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (16,17);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Test nested transaction with 2PC
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (21);
+       SAVEPOINT sp_inner;
+       INSERT INTO tab_full VALUES (22);
+       ROLLBACK TO SAVEPOINT sp_inner;
+       PREPARE TRANSACTION 'outer';
+       ");
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# COMMIT
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check the transaction state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber');
+
+# check inserts are visible. 22 should be rolled back. 21 should be committed.
+$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are on the subscriber');
+
+###############################
+# Test using empty GID
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (51);
+       PREPARE TRANSACTION '';");
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# ROLLBACK
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';");
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# copy_data=false and two_phase
+###############################
+
+#create some test tables for copy tests
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_copy SELECT generate_series(1,5);");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab_copy VALUES (88);");
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(1), 'initial data in subscriber table');
+
+# Setup logical replication
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub_copy FOR TABLE tab_copy;");
+
+my $appname_copy = 'appname_copy';
+$node_subscriber->safe_psql('postgres',        "
+       CREATE SUBSCRIPTION tap_sub_copy
+       CONNECTION '$publisher_connstr application_name=$appname_copy'
+       PUBLICATION tap_pub_copy
+       WITH (two_phase=on, copy_data=false);");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Also wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";
+
+# Check that the initial table data was NOT replicated (because we said copy_data=false)
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(1), 'initial data in subscriber table');
+
+# Now do a prepare on publisher and check that it IS replicated
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_copy VALUES (99);
+    PREPARE TRANSACTION 'mygid';");
+
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Check that the transaction has been prepared on the subscriber, there will be 2
+# prepared transactions for the 2 subscriptions.
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(2), 'transaction is prepared on subscriber');
+
+# Now commit the insert and verify that it IS replicated
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';");
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(6), 'publisher inserted data');
+
+$node_publisher->wait_for_catchup($appname_copy);
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(2), 'replicated data in subscriber table');
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
new file mode 100644 (file)
index 0000000..e61d28a
--- /dev/null
@@ -0,0 +1,235 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test cascading logical replication of 2PC.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 27;
+
+###############################
+# Setup a cascade of pub/sub nodes.
+# node_A -> node_B -> node_C
+###############################
+
+# Initialize nodes
+# node_A
+my $node_A = get_new_node('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf',
+       qq(max_prepared_transactions = 10));
+$node_A->start;
+# node_B
+my $node_B = get_new_node('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf',
+       qq(max_prepared_transactions = 10));
+$node_B->start;
+# node_C
+my $node_C = get_new_node('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf('postgresql.conf',
+       qq(max_prepared_transactions = 10));
+$node_C->start;
+
+# Create some pre-existing content on node_A
+$node_A->safe_psql('postgres',
+       "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_A->safe_psql('postgres', "
+       INSERT INTO tab_full SELECT generate_series(1,10);");
+
+# Create the same tables on node_B amd node_C
+$node_B->safe_psql('postgres',
+       "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_C->safe_psql('postgres',
+       "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres', "
+       CREATE SUBSCRIPTION tap_sub_B
+       CONNECTION '$node_A_connstr application_name=$appname_B'
+       PUBLICATION tap_pub_A
+       WITH (two_phase = on)");
+
+# node_B (pub) -> node_C (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_C = 'tap_sub_C';
+$node_C->safe_psql('postgres', "
+       CREATE SUBSCRIPTION tap_sub_C
+       CONNECTION '$node_B_connstr application_name=$appname_C'
+       PUBLICATION tap_pub_B
+       WITH (two_phase = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# Also wait for two-phase to be enabled
+my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_B->poll_query_until('postgres', $twophase_query)
+       or die "Timed out while waiting for subscriber to enable twophase";
+$node_C->poll_query_until('postgres', $twophase_query)
+       or die "Timed out while waiting for subscriber to enable twophase";
+
+is(1,1, "Cascade setup is complete");
+
+my $result;
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then COMMIT PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (11);
+       PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then ROLLBACK PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (12);
+       PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC ROLLBACK
+$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction is aborted on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+###############################
+# Test nested transactions with 2PC
+###############################
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+       BEGIN;
+       INSERT INTO tab_full VALUES (21);
+       SAVEPOINT sp_inner;
+       INSERT INTO tab_full VALUES (22);
+       ROLLBACK TO SAVEPOINT sp_inner;
+       PREPARE TRANSACTION 'outer';
+       ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# 22 should be rolled back.
+# 21 should be committed.
+$result = $node_B->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber C');
+
+###############################
+# check all the cleanup
+###############################
+
+# cleanup the node_B => node_C pub/sub
+$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node C');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node B');
+
+# cleanup the node_A => node_B pub/sub
+$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node B');
+$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node A');
+
+# shutdown
+$node_C->stop('fast');
+$node_B->stop('fast');
+$node_A->stop('fast');
index b287c29f64671be7d7ae2bf3b980a9a980601f02..37cf4b2f76bd3a1011f1dbfb04e27a4738c970a2 100644 (file)
@@ -1390,12 +1390,15 @@ LogicalOutputPluginWriterUpdateProgress
 LogicalOutputPluginWriterWrite
 LogicalRepBeginData
 LogicalRepCommitData
+LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
 LogicalRepMsgType
 LogicalRepPartMapEntry
+LogicalRepPreparedTxnData
 LogicalRepRelId
 LogicalRepRelMapEntry
 LogicalRepRelation
+LogicalRepRollbackPreparedTxnData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker