if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.commit_time));
OutputPluginWrite(ctx, true);
}
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.prepare_time));
OutputPluginWrite(ctx, true);
}
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.commit_time));
OutputPluginWrite(ctx, true);
}
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.commit_time));
OutputPluginWrite(ctx, true);
}
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.prepare_time));
OutputPluginWrite(ctx, true);
}
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)",
- timestamptz_to_str(txn->commit_time));
+ timestamptz_to_str(txn->xact_time.commit_time));
OutputPluginWrite(ctx, true);
}
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>subtwophasestate</structfield> <type>char</type>
+ </para>
+ <para>
+ State codes for two-phase mode:
+ <literal>d</literal> = disabled,
+ <literal>p</literal> = pending enablement,
+ <literal>e</literal> = enabled
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
</term>
<listitem>
<para>
- Protocol version. Currently versions <literal>1</literal> and
- <literal>2</literal> are supported. The version <literal>2</literal>
- is supported only for server version 14 and above, and it allows
- streaming of large in-progress transactions.
- </para>
+ Protocol version. Currently versions <literal>1</literal>, <literal>2</literal>,
+ and <literal>3</literal> are supported.
+ </para>
+ <para>
+ Version <literal>2</literal> is supported only for server version 14
+ and above, and it allows streaming of large in-progress transactions.
+ </para>
+ <para>
+ Version <literal>3</literal> is supported only for server version 15
+ and above, and it allows streaming of two-phase transactions.
+ </para>
</listitem>
</varlistentry>
<para>
The logical replication protocol sends individual transactions one by one.
This means that all messages between a pair of Begin and Commit messages
- belong to the same transaction. It also sends changes of large in-progress
- transactions between a pair of Stream Start and Stream Stop messages. The
- last stream of such a transaction contains Stream Commit or Stream Abort
- message.
+ belong to the same transaction. Similarly, all messages between a pair of
+ Begin Prepare and Prepare messages belong to the same transaction.
+ It also sends changes of large in-progress transactions between a pair of
+ Stream Start and Stream Stop messages. The last stream of such a transaction
+ contains a Stream Commit or Stream Abort message.
</para>
<para>
</variablelist>
+<para>
+The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
+are available since protocol version 3.
+</para>
+
+<variablelist>
+
+<varlistentry>
+
+<term>Begin Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('b')</term>
+<listitem><para>
+ Identifies the message as the beginning of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ Prepare timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+ Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+ The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('P')</term>
+<listitem><para>
+ Identifies the message as a two-phase prepared transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+ Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ Prepare timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+ Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+ The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Commit Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('K')</term>
+<listitem><para>
+ Identifies the message as the commit of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+ Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The LSN of the commit prepared.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The end LSN of the commit prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ Commit timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+ Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+ The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+
+<term>Rollback Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('r')</term>
+<listitem><para>
+ Identifies the message as the rollback of a two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+ Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The end LSN of the prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The end LSN of the rollback prepared transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ Prepare timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ Rollback timestamp of the transaction. The value is in number
+ of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+ Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+ The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+
<para>
The following message parts are shared by the above messages.
Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
<command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command> with refresh
option as true cannot be executed inside a transaction block.
+
+ These commands also cannot be executed when the subscription has
+ <literal>two_phase</literal> commit enabled, unless <literal>copy_data = false</literal>.
+ See column <literal>subtwophasestate</literal> of
+ <xref linkend="catalog-pg-subscription"/> to know the actual two-phase state.
</para>
</refsect1>
are fully decoded on the publisher, and only then sent to the
subscriber as a whole.
</para>
+
+ <para>
+ The <literal>streaming</literal> option cannot be used with the
+ <literal>two_phase</literal> option.
+ </para>
+
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><literal>two_phase</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether two-phase commit is enabled for this subscription.
+ The default is <literal>false</literal>.
+ </para>
+
+ <para>
+ When two-phase commit is enabled then the decoded transactions are sent
+ to the subscriber on the PREPARE TRANSACTION. By default, the transaction
+ prepared on the publisher is decoded as a normal transaction at commit.
+ </para>
+
+ <para>
+ The two-phase commit implementation requires that the replication has
+ successfully passed the initial table synchronization phase. This means
+ even when two_phase is enabled for the subscription, the internal
+ two-phase state remains temporarily "pending" until the initialization
+ phase is completed. See column
+ <literal>subtwophasestate</literal> of <xref linkend="catalog-pg-subscription"/>
+ to know the actual two-phase state.
+ </para>
+
+ <para>
+ The <literal>two_phase</literal> option cannot be used with the
+ <literal>streaming</literal> option.
+ </para>
+
</listitem>
</varlistentry>
</variablelist></para>
servers. It is then up to the user to reactivate the subscriptions in a
suitable way. If the involved hosts have changed, the connection
information might have to be changed. It might also be appropriate to
- truncate the target tables before initiating a new full table copy.
+ truncate the target tables before initiating a new full table copy. If users
+ intend to copy initial data during refresh they must create the slot with
+ <literal>two_phase = false</literal>. After the initial sync, the
+ <literal>two_phase</literal> option will be automatically enabled by the
+ subscriber if the subscription had been originally created with
+ <literal>two_phase = true</literal> option.
</para>
</refsect1>
RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
}
+
+/*
+ * LookupGXact
+ * Check if the prepared transaction with the given GID, lsn and timestamp
+ * exists.
+ *
+ * Note that we always compare with the LSN where prepare ends because that is
+ * what is stored as origin_lsn in the 2PC file.
+ *
+ * This function is primarily used to check if the prepared transaction
+ * received from the upstream (remote node) already exists. Checking only GID
+ * is not sufficient because a different prepared xact with the same GID can
+ * exist on the same node. So, we are ensuring to match origin_lsn and
+ * origin_timestamp of prepared xact to avoid the possibility of a match of
+ * prepared xact from two different nodes.
+ */
+bool
+LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
+ TimestampTz origin_prepare_timestamp)
+{
+ int i;
+ bool found = false;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ /* Ignore not-yet-valid GIDs. */
+ if (gxact->valid && strcmp(gxact->gid, gid) == 0)
+ {
+ char *buf;
+ TwoPhaseFileHeader *hdr;
+
+ /*
+ * We are not expecting collisions of GXACTs (same gid) between
+ * publisher and subscribers, so we perform all I/O while holding
+ * TwoPhaseStateLock for simplicity.
+ *
+ * To move the I/O out of the lock, we need to ensure that no
+ * other backend commits the prepared xact in the meantime. We can
+ * do this optimization if we encounter many collisions in GID
+ * between publisher and subscriber.
+ */
+ if (gxact->ondisk)
+ buf = ReadTwoPhaseFile(gxact->xid, false);
+ else
+ {
+ Assert(gxact->prepare_start_lsn);
+ XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
+ }
+
+ hdr = (TwoPhaseFileHeader *) buf;
+
+ if (hdr->origin_lsn == prepare_end_lsn &&
+ hdr->origin_timestamp == origin_prepare_timestamp)
+ {
+ found = true;
+ pfree(buf);
+ break;
+ }
+
+ pfree(buf);
+ }
+ }
+ LWLockRelease(TwoPhaseStateLock);
+ return found;
+}
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
sub->stream = subform->substream;
+ sub->twophasestate = subform->subtwophasestate;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
table_close(rel, RowExclusiveLock);
}
+/*
+ * Does the subscription have any relations?
+ *
+ * Use this function only to know true/false, and when you have no need for the
+ * List returned by GetSubscriptionRelations.
+ */
+bool
+HasSubscriptionRelations(Oid subid)
+{
+ Relation rel;
+ ScanKeyData skey[1];
+ SysScanDesc scan;
+ bool has_subrels;
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, 1, skey);
+
+ /* If even a single tuple exists then the subscription has tables. */
+ has_subrels = HeapTupleIsValid(systable_getnext(scan));
+
+ /* Cleanup */
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ return has_subrels;
+}
/*
* Get all relations for subscription.
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subslotname, subsynccommit, subpublications)
+ substream, subtwophasestate, subslotname, subsynccommit, subpublications)
ON pg_subscription TO public;
#define SUBOPT_REFRESH 0x00000040
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
+#define SUBOPT_TWOPHASE_COMMIT 0x00000200
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
bool refresh;
bool binary;
bool streaming;
+ bool twophase;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = false;
+ if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
+ opts->twophase = false;
/* Parse options */
foreach(lc, stmt_options)
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ /*
+ * Do not allow toggling of two_phase option. Doing so could cause
+ * missing of transactions and lead to an inconsistent replica.
+ * See comments atop worker.c
+ *
+ * Note: Unsupported twophase indicates that this call originated
+ * from AlterSubscription.
+ */
+ if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
+
+ if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
+ opts->twophase = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "create_slot = false")));
}
+
+ /*
+ * Do additional checking for the disallowed combination of two_phase and
+ * streaming. While streaming and two_phase can theoretically be
+ * supported, it needs more analysis to allow them together.
+ */
+ if (opts->twophase &&
+ IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+ IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
+ {
+ if (opts->streaming &&
+ IsSet(supported_opts, SUBOPT_STREAMING) &&
+ IsSet(opts->specified_opts, SUBOPT_STREAMING))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ /*- translator: both %s are strings of the form "option = value" */
+ errmsg("%s and %s are mutually exclusive options",
+ "two_phase = true", "streaming = true")));
+ }
}
/*
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING);
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
parse_subscription_options(stmt->options, supported_opts, &opts);
/*
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+ values[Anum_pg_subscription_subtwophasestate - 1] =
+ CharGetDatum(opts.twophase ?
+ LOGICALREP_TWOPHASE_STATE_PENDING :
+ LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
*/
if (opts.create_slot)
{
+ bool twophase_enabled = false;
+
Assert(opts.slot_name);
- walrcv_create_slot(wrconn, opts.slot_name, false,
+ /*
+ * Even if two_phase is set, don't create the slot with
+ * two-phase enabled. Will enable it once all the tables are
+ * synced and ready. This avoids race-conditions like prepared
+ * transactions being skipped due to changes not being applied
+ * due to checks in should_apply_changes_for_rel() when
+ * tablesync for the corresponding tables are in progress. See
+ * comments atop worker.c.
+ *
+ * Note that if tables were specified but copy_data is false
+ * then it is safe to enable two_phase up-front because those
+ * tables are already initially in READY state. When the
+ * subscription has no tables, we leave the twophase state as
+ * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
+ * PUBLICATION to work.
+ */
+ if (opts.twophase && !opts.copy_data && tables != NIL)
+ twophase_enabled = true;
+
+ walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
CRS_NOEXPORT_SNAPSHOT, NULL);
+
+ if (twophase_enabled)
+ UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
opts.slot_name)));
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
+ if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot set %s for two-phase enabled subscription",
+ "streaming = true")));
+
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+ /*
+ * See ALTER_SUBSCRIPTION_REFRESH for details why this is
+ * not allowed.
+ */
+ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false"
+ ", or use DROP/CREATE SUBSCRIPTION.")));
+
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
/* Make sure refresh sees the new list of publications. */
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+ /*
+ * See ALTER_SUBSCRIPTION_REFRESH for details why this is
+ * not allowed.
+ */
+ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh = false, or with copy_data = false"
+ ", or use DROP/CREATE SUBSCRIPTION.")));
+
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
/* Only refresh the added/dropped list of publications. */
parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
+ /*
+ * The subscription option "two_phase" requires that
+ * replication has passed the initial table synchronization
+ * phase before the two_phase becomes properly enabled.
+ *
+ * But, having reached this two-phase commit "enabled" state
+ * we must not allow any subsequent table initialization to
+ * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
+ * when the user had requested two_phase = on mode.
+ *
+ * The exception to this restriction is when copy_data =
+ * false, because when copy_data is false the tablesync will
+ * start already in READY state and will exit directly without
+ * doing anything.
+ *
+ * For more details see comments atop worker.c.
+ */
+ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false"
+ ", or use DROP/CREATE SUBSCRIPTION.")));
+
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
AlterSubscription_refresh(sub, opts.copy_data);
static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
+ bool two_phase,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfoString(&cmd, ", streaming 'on'");
+ if (options->proto.logical.twophase &&
+ PQserverVersion(conn->streamConn) >= 150000)
+ appendStringInfoString(&cmd, ", two_phase 'on'");
+
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
- bool temporary, CRSSnapshotAction snapshot_action,
+ bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn)
{
PGresult *res;
if (conn->logical)
{
appendStringInfoString(&cmd, " LOGICAL pgoutput");
+ if (two_phase)
+ appendStringInfoString(&cmd, " TWO_PHASE");
+
switch (snapshot_action)
{
case CRS_EXPORT_SNAPSHOT:
*
* XXX Now, this can even lead to a deadlock if the prepare
* transaction is waiting to get it logically replicated for
- * distributed 2PC. Currently, we don't have an in-core
- * implementation of prepares for distributed 2PC but some
- * out-of-core logical replication solution can have such an
- * implementation. They need to inform users to not have locks
- * on catalog tables in such transactions.
+ * distributed 2PC. This can be avoided by disallowing
+ * preparing transactions that have locked [user] catalog
+ * tables exclusively but as of now, we ask users not to do
+ * such an operation.
*/
DecodePrepare(ctx, buf, &parsed);
break;
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
- SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
+ SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot, slot->data.initial_consistent_point);
+ need_full_snapshot, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= slot->data.two_phase;
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
MemoryContextSwitchTo(old_context);
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions when the two_phase is
+ * enabled at the time of slot creation, or when the two_phase option is
+ * given at the streaming start, provided the plugin supports all the
+ * callbacks for two-phase.
*/
- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
+
+ /* Mark slot to allow two_phase decoding if not already marked */
+ if (ctx->twophase && !slot->data.two_phase)
+ {
+ slot->data.two_phase = true;
+ slot->data.two_phase_at = start_lsn;
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
+ }
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
- slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
+ if (slot->data.two_phase)
+ slot->data.two_phase_at = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
/*
* Due to - harmless - race conditions during a checkpoint we could see
- * values here that are older than the ones we already have in memory.
- * Don't overwrite those.
+ * values here that are older than the ones we already have in memory. We
+ * could also see older values for prepared transactions when the prepare
+ * is sent at a later point of time along with commit prepared and there
+ * are other transactions commits between prepare and commit prepared. See
+ * ReorderBufferFinishPrepared. Don't overwrite those.
*/
if (go_backward || replication_state->remote_lsn < remote_commit)
replication_state->remote_lsn = remote_commit;
/* fixed fields */
pq_sendint64(out, txn->final_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
pq_sendint32(out, txn->xid);
}
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
commit_data->committime = pq_getmsgint64(in);
}
+/*
+ * Write BEGIN PREPARE to the output stream.
+ */
+void
+logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN_PREPARE);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction BEGIN PREPARE from the stream.
+ */
+void
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
+{
+ /* read fields */
+ begin_data->prepare_lsn = pq_getmsgint64(in);
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn not set in begin prepare message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin prepare message");
+ begin_data->prepare_time = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(begin_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+ Assert(rbtxn_prepared(txn));
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+ /* read fields */
+ prepare_data->prepare_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_lsn is not set in prepare message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in prepare message");
+ prepare_data->prepare_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write COMMIT PREPARED to the output stream.
+ */
+void
+logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction COMMIT PREPARED from the stream.
+ */
+void
+logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
+
+ /* read fields */
+ prepare_data->commit_lsn = pq_getmsgint64(in);
+ if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "commit_lsn is not set in commit prepared message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in commit prepared message");
+ prepare_data->commit_time = pq_getmsgint64(in);
+ prepare_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
+ * Write ROLLBACK PREPARED to the output stream.
+ */
+void
+logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+ /*
+ * This should only ever happen for two-phase commit transactions, in
+ * which case we expect to have a valid GID.
+ */
+ Assert(txn->gid != NULL);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_end_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, prepare_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
+ pq_sendint32(out, txn->xid);
+
+ /* send gid */
+ pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction ROLLBACK PREPARED from the stream.
+ */
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in rollback prepared message", flags);
+
+ /* read fields */
+ rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_end_lsn is not set in rollback prepared message");
+ rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "rollback_end_lsn is not set in rollback prepared message");
+ rollback_data->prepare_time = pq_getmsgint64(in);
+ rollback_data->rollback_time = pq_getmsgint64(in);
+ rollback_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(rollback_data->gid, pq_getmsgstring(in));
+}
+
/*
* Write ORIGIN to the output stream.
*/
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
- pq_sendint64(out, txn->commit_time);
+ pq_sendint64(out, txn->xact_time.commit_time);
}
/*
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
+ txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
*/
txn->final_lsn = prepare_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = prepare_time;
+ txn->xact_time.prepare_time = prepare_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
Assert(txn->final_lsn != InvalidXLogRecPtr);
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
- txn->commit_time, txn->origin_id, txn->origin_lsn);
+ txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
* We send the prepare for the concurrently aborted xacts so that later
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- XLogRecPtr initial_consistent_point,
+ XLogRecPtr two_phase_at,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
* be later used for rollback.
*/
prepare_end_lsn = txn->end_lsn;
- prepare_time = txn->commit_time;
+ prepare_time = txn->xact_time.prepare_time;
/* add the gid in the txn */
txn->gid = pstrdup(gid);
/*
* It is possible that this transaction is not decoded at prepare time
- * either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We only need to send the
- * prepare if it was not decoded earlier. We don't need to decode the xact
- * for aborts if it is not done already.
+ * either because by that time we didn't have a consistent snapshot, or
+ * two_phase was not enabled, or it was decoded earlier but we have
+ * restarted. We only need to send the prepare if it was not decoded
+ * earlier. We don't need to decode the xact for aborts if it is not done
+ * already.
*/
- if ((txn->final_lsn < initial_consistent_point) && is_commit)
+ if ((txn->final_lsn < two_phase_at) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
* prepared after the restart.
*/
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
- txn->commit_time, txn->origin_id, txn->origin_lsn);
+ txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
}
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
- txn->commit_time = commit_time;
+ txn->xact_time.commit_time = commit_time;
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
XLogRecPtr start_decoding_at;
/*
- * LSN at which we found a consistent point at the time of slot creation.
- * This is also the point where we have exported a snapshot for the
- * initial copy.
+ * LSN at which two-phase decoding was enabled or LSN at which we found a
+ * consistent point at the time of slot creation.
*
- * The prepared transactions that are not covered by initial snapshot
- * needs to be sent later along with commit prepared and they must be
- * before this point.
+ * The prepared transactions, that were skipped because previously
+ * two-phase was not enabled or are not covered by initial snapshot, need
+ * to be sent later along with commit prepared and they must be before
+ * this point.
*/
- XLogRecPtr initial_consistent_point;
+ XLogRecPtr two_phase_at;
/*
* Don't start decoding WAL until the "xl_running_xacts" information
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
bool need_full_snapshot,
- XLogRecPtr initial_consistent_point)
+ XLogRecPtr two_phase_at)
{
MemoryContext context;
MemoryContext oldcontext;
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
- builder->initial_consistent_point = initial_consistent_point;
+ builder->two_phase_at = two_phase_at;
MemoryContextSwitchTo(oldcontext);
}
/*
- * Return the LSN at which the snapshot was exported
+ * Return the LSN at which the two-phase decoding was first enabled.
*/
XLogRecPtr
-SnapBuildInitialConsistentPoint(SnapBuild *builder)
+SnapBuildGetTwoPhaseAt(SnapBuild *builder)
{
- return builder->initial_consistent_point;
+ return builder->two_phase_at;
+}
+
+/*
+ * Set the LSN at which two-phase decoding is enabled.
+ */
+void
+SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
+{
+ builder->two_phase_at = ptr;
}
/*
#include "access/table.h"
#include "access/xact.h"
+#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
static bool table_states_valid = false;
+static List *table_states_not_ready = NIL;
+static bool FetchTableStates(bool *started_tx);
StringInfo copybuf = NULL;
Oid relid;
TimestampTz last_start_time;
};
- static List *table_states = NIL;
static HTAB *last_start_times = NULL;
ListCell *lc;
bool started_tx = false;
Assert(!IsTransactionState());
/* We need up-to-date sync state info for subscription tables here. */
- if (!table_states_valid)
- {
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
- SubscriptionRelState *rstate;
-
- /* Clean the old list. */
- list_free_deep(table_states);
- table_states = NIL;
-
- StartTransactionCommand();
- started_tx = true;
-
- /* Fetch all non-ready tables. */
- rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
-
- /* Allocate the tracking info in a permanent memory context. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
- {
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states = lappend(table_states, rstate);
- }
- MemoryContextSwitchTo(oldctx);
-
- table_states_valid = true;
- }
+ FetchTableStates(&started_tx);
/*
* Prepare a hash table for tracking last start times of workers, to avoid
* immediate restarts. We don't need it if there are no tables that need
* syncing.
*/
- if (table_states && !last_start_times)
+ if (table_states_not_ready && !last_start_times)
{
HASHCTL ctl;
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
*/
- else if (!table_states && last_start_times)
+ else if (!table_states_not_ready && last_start_times)
{
hash_destroy(last_start_times);
last_start_times = NULL;
}
+ /*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * 'pending' until all tablesyncs have reached READY state.
+ *
+ * When this happens, we restart the apply worker and (if the conditions
+ * are still ok) then the two_phase tri-state will become 'enabled' at
+ * that time.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
/*
* Process all tables that are being synchronized.
*/
- foreach(lc, table_states)
+ foreach(lc, table_states_not_ready)
{
SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
* slot leading to a dangling slot on the server.
*/
HOLD_INTERRUPTS();
- walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
+ walrcv_create_slot(LogRepWorkerWalRcvConn,
+ slotname, false /* permanent */ , false /* two_phase */ ,
CRS_USE_SNAPSHOT, origin_startpos);
RESUME_INTERRUPTS();
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+static bool
+FetchTableStates(bool *started_tx)
+{
+ static bool has_subrels = false;
+
+ *started_tx = false;
+
+ if (!table_states_valid)
+ {
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
+ SubscriptionRelState *rstate;
+
+ /* Clean the old lists. */
+ list_free_deep(table_states_not_ready);
+ table_states_not_ready = NIL;
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ *started_tx = true;
+ }
+
+ /* Fetch all non-ready tables. */
+ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+
+ /* Allocate the tracking info in a permanent memory context. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ foreach(lc, rstates)
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready, rstate);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Does the subscription have tables?
+ *
+ * If there were not-READY relations found then we know it does. But
+ * if table_state_not_ready was empty we still need to check again to
+ * see if there are 0 tables.
+ */
+ has_subrels = (list_length(table_states_not_ready) > 0) ||
+ HasSubscriptionRelations(MySubscription->oid);
+
+ table_states_valid = true;
+ }
+
+ return has_subrels;
+}
+
+/*
+ * If the subscription has no tables then return false.
+ *
+ * Otherwise, are all tablesyncs READY?
+ *
+ * Note: This function is not suitable to be called from outside of apply or
+ * tablesync workers because MySubscription needs to be already initialized.
+ */
+bool
+AllTablesyncsReady(void)
+{
+ bool started_tx = false;
+ bool has_subrels = false;
+
+ /* We need up-to-date sync state info for subscription tables here. */
+ has_subrels = FetchTableStates(&started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+ }
+
+ /*
+ * Return false when there are no tables in subscription or not all tables
+ * are in ready state; true otherwise.
+ */
+ return has_subrels && list_length(table_states_not_ready) == 0;
+}
+
+/*
+ * Update the two_phase state of the specified subscription in pg_subscription.
+ */
+void
+UpdateTwoPhaseState(Oid suboid, char new_state)
+{
+ Relation rel;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
+ new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
+ new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR,
+ "cache lookup failed for subscription oid %u",
+ suboid);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* And update/set two_phase state */
+ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel),
+ values, nulls, replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ heap_freetuple(tup);
+ table_close(rel, RowExclusiveLock);
+}
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
+ *
+ * TWO_PHASE TRANSACTIONS
+ * ----------------------
+ * Two phase transactions are replayed at prepare and then committed or
+ * rolled back at commit prepared and rollback prepared respectively. It is
+ * possible to have a prepared transaction that arrives at the apply worker
+ * when the tablesync is busy doing the initial copy. In this case, the apply
+ * worker skips all the prepared operations [e.g. inserts] while the tablesync
+ * is still busy (see the condition of should_apply_changes_for_rel). The
+ * tablesync worker might not get such a prepared transaction because say it
+ * was prior to the initial consistent point but might have got some later
+ * commits. Now, the tablesync worker will exit without doing anything for the
+ * prepared transaction skipped by the apply worker as the sync location for it
+ * will be already ahead of the apply worker's current location. This would lead
+ * to an "empty prepare", because later when the apply worker does the commit
+ * prepare, there is nothing in it (the inserts were skipped earlier).
+ *
+ * To avoid this, and similar prepare confusions the subscription's two_phase
+ * commit is enabled only after the initial sync is over. The two_phase option
+ * has been implemented as a tri-state with values DISABLED, PENDING, and
+ * ENABLED.
+ *
+ * Even if the user specifies they want a subscription with two_phase = on,
+ * internally it will start with a tri-state of PENDING which only becomes
+ * ENABLED after all tablesync initializations are completed - i.e. when all
+ * tablesync workers have reached their READY state. In other words, the value
+ * PENDING is only a temporary state for subscription start-up.
+ *
+ * Until the two_phase is properly available (ENABLED) the subscription will
+ * behave as if two_phase = off. When the apply worker detects that all
+ * tablesyncs have become READY (while the tri-state was PENDING) it will
+ * restart the apply worker process. This happens in
+ * process_syncing_tables_for_apply.
+ *
+ * When the (re-started) apply worker finds that all tablesyncs are READY for a
+ * two_phase tri-state of PENDING it start streaming messages with the
+ * two_phase option which in turn enables the decoding of two-phase commits at
+ * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
+ * Now, it is possible that during the time we have not enabled two_phase, the
+ * publisher (replication server) would have skipped some prepares but we
+ * ensure that such prepares are sent along with commit prepare, see
+ * ReorderBufferFinishPrepared.
+ *
+ * If the subscription has no tables then a two_phase tri-state PENDING is
+ * left unchanged. This lets the user still do an ALTER TABLE REFRESH
+ * PUBLICATION which might otherwise be disallowed (see below).
+ *
+ * If ever a user needs to be aware of the tri-state value, they can fetch it
+ * from the pg_subscription catalog (see column subtwophasestate).
+ *
+ * We don't allow to toggle two_phase option of a subscription because it can
+ * lead to an inconsistent replica. Consider, initially, it was on and we have
+ * received some prepare then we turn it off, now at commit time the server
+ * will send the entire transaction data along with the commit. With some more
+ * analysis, we can allow changing this option from off to on but not sure if
+ * that alone would be useful.
+ *
+ * Finally, to avoid problems mentioned in previous paragraphs from any
+ * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
+ * to 'off' and then again back to 'on') there is a restriction for
+ * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
+ * the two_phase tri-state is ENABLED, except when copy_data = false.
+ *
+ * We can get prepare of the same GID more than once for the genuine cases
+ * where we have defined multiple subscriptions for publications on the same
+ * server and prepared transaction has operations on tables subscribed to those
+ * subscriptions. For such cases, if we use the GID sent by publisher one of
+ * the prepares will be successful and others will fail, in which case the
+ * server will send them again. Now, this can lead to a deadlock if user has
+ * set synchronous_standby_names for all the subscriptions on subscriber. To
+ * avoid such deadlocks, we generate a unique GID (consisting of the
+ * subscription oid and the xid of the prepared transaction) for each prepare
+ * transaction on the subscriber.
*-------------------------------------------------------------------------
*/
#include "access/table.h"
#include "access/tableam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
LogicalRepTupleData *newtup,
CmdType operation);
+/* Compute GID for two_phase transactions */
+static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+
+
/*
* Should this worker apply changes for given relation.
*
pgstat_report_activity(STATE_IDLE, NULL);
}
+/*
+ * Handle BEGIN PREPARE message.
+ */
+static void
+apply_handle_begin_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData begin_data;
+
+ /* Tablesync should never receive prepare. */
+ if (am_tablesync_worker())
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+
+ logicalrep_read_begin_prepare(s, &begin_data);
+
+ remote_final_lsn = begin_data.prepare_lsn;
+
+ in_remote_transaction = true;
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_prepare(s, &prepare_data);
+
+ if (prepare_data.prepare_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * Unlike commit, here, we always prepare the transaction even though no
+ * change has happened in this transaction. It is done this way because at
+ * commit prepared time, we won't know whether we have skipped preparing a
+ * transaction because of no change.
+ *
+ * XXX, We can optimize such that at commit prepared time, we first check
+ * whether we have prepared the transaction or not but that doesn't seem
+ * worthwhile because such cases shouldn't be common.
+ */
+ begin_replication_step();
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+ PrepareTransactionBlock(gid);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ */
+static void
+apply_handle_commit_prepared(StringInfo s)
+{
+ LogicalRepCommitPreparedTxnData prepare_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_commit_prepared(s, &prepare_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+ gid, sizeof(gid));
+
+ /* There is no transaction when COMMIT PREPARED is called */
+ begin_replication_step();
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.commit_time;
+
+ FinishPreparedTransaction(gid, true);
+ end_replication_step();
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ */
+static void
+apply_handle_rollback_prepared(StringInfo s)
+{
+ LogicalRepRollbackPreparedTxnData rollback_data;
+ char gid[GIDSIZE];
+
+ logicalrep_read_rollback_prepared(s, &rollback_data);
+
+ /* Compute GID for two_phase transactions. */
+ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
+ gid, sizeof(gid));
+
+ /*
+ * It is possible that we haven't received prepare because it occurred
+ * before walsender reached a consistent point or the two_phase was still
+ * not enabled by that time, so in such cases, we need to skip rollback
+ * prepared.
+ */
+ if (LookupGXact(gid, rollback_data.prepare_end_lsn,
+ rollback_data.prepare_time))
+ {
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
+ replorigin_session_origin_timestamp = rollback_data.rollback_time;
+
+ /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
+ begin_replication_step();
+ FinishPreparedTransaction(gid, false);
+ end_replication_step();
+ CommitTransactionCommand();
+ }
+
+ pgstat_report_stat(false);
+
+ store_flush_position(rollback_data.rollback_end_lsn);
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(rollback_data.rollback_end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
/*
* Handle ORIGIN message.
*
case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
return;
+
+ case LOGICAL_REP_MSG_BEGIN_PREPARE:
+ apply_handle_begin_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_PREPARE:
+ apply_handle_prepare(s);
+ return;
+
+ case LOGICAL_REP_MSG_COMMIT_PREPARED:
+ apply_handle_commit_prepared(s);
+ return;
+
+ case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+ apply_handle_rollback_prepared(s);
+ return;
}
ereport(ERROR,
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
+ /* two-phase should not be altered */
+ Assert(newsub->twophasestate == MySubscription->twophasestate);
+
/*
* Exit if any parameter that affects the remote connection was changed.
* The launcher will start a new worker.
subxact_data.nsubxacts_max = 0;
}
+/*
+ * Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+static void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
+{
+ Assert(subid != InvalidRepOriginId);
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid two-phase transaction ID")));
+
+ snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
XLogRecPtr origin_startpos;
char *myslotname;
WalRcvStreamOptions options;
+ int server_version;
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
options.logical = true;
options.startpoint = origin_startpos;
options.slotname = myslotname;
+
+ server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
- walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
- LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
+ server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+ server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+ LOGICALREP_PROTO_VERSION_NUM;
+
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
+ options.proto.logical.twophase = false;
+
+ if (!am_tablesync_worker())
+ {
+ /*
+ * Even when the two_phase mode is requested by the user, it remains
+ * as the tri-state PENDING until all tablesyncs have reached READY
+ * state. Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ {
+ /* Start streaming with two_phase enabled */
+ options.proto.logical.twophase = true;
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
- /* Start normal logical streaming replication. */
- walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ StartTransactionCommand();
+ UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
+
+ ereport(DEBUG1,
+ (errmsg("logical replication apply worker for subscription \"%s\" two_phase is %s.",
+ MySubscription->name,
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));
+ }
+ else
+ {
+ /* Start normal logical streaming replication. */
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+ }
/* Run the main loop. */
LogicalRepApplyLoop(origin_startpos);
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
+static void send_repl_origin(LogicalDecodingContext *ctx,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ bool send_origin);
/*
* Entry in the map used to remember which relation schemas we sent.
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
+
+ cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
+ cb->prepare_cb = pgoutput_prepare_txn;
+ cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+ cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
+ /* transaction streaming - two-phase commit */
+ cb->stream_prepare_cb = NULL;
}
static void
bool binary_option_given = false;
bool messages_option_given = false;
bool streaming_given = false;
+ bool two_phase_option_given = false;
data->binary = false;
data->streaming = false;
data->messages = false;
+ data->two_phase = false;
foreach(lc, options)
{
data->streaming = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_option_given = true;
+
+ data->two_phase = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
+
+ /*
+ * Do additional checking for the disallowed combination of two_phase
+ * and streaming. While streaming and two_phase can theoretically be
+ * supported, it needs more analysis to allow them together.
+ */
+ if (data->two_phase && data->streaming)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s and %s are mutually exclusive options",
+ "two_phase", "streaming")));
}
}
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
+ /*
+ * Here, we just check whether the two-phase option is passed by
+ * plugin and decide whether to enable it at later point of time. It
+ * remains enabled if the previous start-up has done so. But we only
+ * allow the option to be passed in with sufficient version of the
+ * protocol, and when the output plugin supports it.
+ */
+ if (!data->two_phase)
+ ctx->twophase_opt_given = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+ else if (!ctx->twophase)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("two-phase commit requested, but not supported by output plugin")));
+ else
+ ctx->twophase_opt_given = true;
+
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
}
else
{
- /* Disable the streaming during the slot initialization mode. */
+ /*
+ * Disable the streaming and prepared transactions during the slot
+ * initialization mode.
+ */
ctx->streaming = false;
+ ctx->twophase = false;
}
}
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
- if (send_replication_origin)
- {
- char *origin;
-
- /*----------
- * XXX: which behaviour do we want here?
- *
- * Alternatives:
- * - don't send origin message if origin name not found
- * (that's what we do now)
- * - throw error - that will break replication, not good
- * - send some special "unknown" origin
- *----------
- */
- if (replorigin_by_oid(txn->origin_id, true, &origin))
- {
- /* Message boundary */
- OutputPluginWrite(ctx, false);
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
- }
-
- }
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
OutputPluginWrite(ctx, true);
}
OutputPluginWrite(ctx, true);
}
+/*
+ * BEGIN PREPARE callback
+ */
+static void
+pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_begin_prepare(ctx->out, txn);
+
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * ROLLBACK PREPARED callback
+ */
+static void
+pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
+ prepare_time);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Write the current schema of the relation and its ancestor (if any) if not
* done yet.
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
- if (send_replication_origin)
- {
- char *origin;
-
- if (replorigin_by_oid(txn->origin_id, true, &origin))
- {
- /* Message boundary */
- OutputPluginWrite(ctx, false);
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
- }
- }
+ send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
+ send_replication_origin);
OutputPluginWrite(ctx, true);
entry->pubactions.pubtruncate = false;
}
}
+
+/* Send Replication origin */
+static void
+send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, bool send_origin)
+{
+ if (send_origin)
+ {
+ char *origin;
+
+ /*----------
+ * XXX: which behaviour do we want here?
+ *
+ * Alternatives:
+ * - don't send origin message if origin name not found
+ * (that's what we do now)
+ * - throw error - that will break replication, not good
+ * - send some special "unknown" origin
+ *----------
+ */
+ if (replorigin_by_oid(origin_id, true, &origin))
+ {
+ /* Message boundary */
+ OutputPluginWrite(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+
+ logicalrep_write_origin(ctx->out, origin, origin_lsn);
+ }
+ }
+}
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
slot->data.persistency = persistency;
slot->data.two_phase = two_phase;
+ slot->data.two_phase_at = InvalidXLogRecPtr;
/* and then data only present in shared memory */
slot->just_dirtied = false;
"pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn));
- walrcv_create_slot(wrconn, slotname, true, 0, NULL);
+ walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
#include "catalog/pg_largeobject_d.h"
#include "catalog/pg_largeobject_metadata_d.h"
#include "catalog/pg_proc_d.h"
+#include "catalog/pg_subscription.h"
#include "catalog/pg_trigger_d.h"
#include "catalog/pg_type_d.h"
#include "common/connect.h"
int i_subname;
int i_rolname;
int i_substream;
+ int i_subtwophasestate;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
appendPQExpBufferStr(query, " false AS subbinary,\n");
if (fout->remoteVersion >= 140000)
- appendPQExpBufferStr(query, " s.substream\n");
+ appendPQExpBufferStr(query, " s.substream,\n");
else
- appendPQExpBufferStr(query, " false AS substream\n");
+ appendPQExpBufferStr(query, " false AS substream,\n");
+
+ if (fout->remoteVersion >= 150000)
+ appendPQExpBufferStr(query, " s.subtwophasestate\n");
+ else
+ appendPQExpBuffer(query,
+ " '%c' AS subtwophasestate\n",
+ LOGICALREP_TWOPHASE_STATE_DISABLED);
appendPQExpBufferStr(query,
"FROM pg_subscription s\n"
i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
+ i_subtwophasestate = PQfnumber(res, "subtwophasestate");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
pg_strdup(PQgetvalue(res, i, i_subbinary));
subinfo[i].substream =
pg_strdup(PQgetvalue(res, i, i_substream));
+ subinfo[i].subtwophasestate =
+ pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
if (strlen(subinfo[i].rolname) == 0)
pg_log_warning("owner of subscription \"%s\" appears to be invalid",
char **pubnames = NULL;
int npubnames = 0;
int i;
+ char two_phase_disabled[] = {LOGICALREP_TWOPHASE_STATE_DISABLED, '\0'};
if (!(subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
return;
if (strcmp(subinfo->substream, "f") != 0)
appendPQExpBufferStr(query, ", streaming = on");
+ if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
+ appendPQExpBufferStr(query, ", two_phase = on");
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
char *subslotname;
char *subbinary;
char *substream;
+ char *subtwophasestate;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false};
+ false, false, false, false, false};
if (pset.sversion < 100000)
{
gettext_noop("Binary"),
gettext_noop("Streaming"));
+ /* Two_phase is only supported in v15 and higher */
+ if (pset.sversion >= 150000)
+ appendPQExpBuffer(&buf,
+ ", subtwophasestate AS \"%s\"\n",
+ gettext_noop("Two phase commit"));
+
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
", subconninfo AS \"%s\"\n",
else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"enabled", "slot_name", "streaming",
- "synchronous_commit");
+ "synchronous_commit", "two_phase");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
XLogRecPtr end_lsn, RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
+extern bool LookupGXact(const char *gid, XLogRecPtr prepare_at_lsn,
+ TimestampTz origin_prepare_timestamp);
#endif /* TWOPHASE_H */
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202107071
+#define CATALOG_VERSION_NO 202107141
#endif
#include "nodes/pg_list.h"
+/*
+ * two_phase tri-state values. See comments atop worker.c to know more about
+ * these states.
+ */
+#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
+#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
+#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'
+
/* ----------------
* pg_subscription definition. cpp turns this into
* typedef struct FormData_pg_subscription
bool substream; /* Stream in-progress transactions. */
+ char subtwophasestate; /* Stream two-phase transactions */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
bool binary; /* Indicates if the subscription wants data in
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
+ char twophasestate; /* Allow streaming two-phase transactions */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+extern bool HasSubscriptionRelations(Oid subid);
extern List *GetSubscriptionRelations(Oid subid);
extern List *GetSubscriptionNotReadyRelations(Oid subid);
*/
bool twophase;
+ /*
+ * Is two-phase option given by output plugin?
+ *
+ * This flag indicates that the plugin passed in the two-phase option as
+ * part of the START_STREAMING command. We can't rely solely on the
+ * twophase flag which only tells whether the plugin provided all the
+ * necessary two-phase callbacks.
+ */
+ bool twophase_opt_given;
+
/*
* State for writing output.
*/
#ifndef LOGICAL_PROTO_H
#define LOGICAL_PROTO_H
+#include "access/xact.h"
#include "replication/reorderbuffer.h"
#include "utils/rel.h"
* connect time.
*
* LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
- * support for streaming large transactions.
+ * support for streaming large transactions. Introduced in PG14.
+ *
+ * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
+ * support for two-phase commit decoding (at prepare time). Introduced in PG15.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
+#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
/*
* Logical message types
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
+ LOGICAL_REP_MSG_PREPARE = 'P',
+ LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
+ LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r',
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
TimestampTz committime;
} LogicalRepCommitData;
+/*
+ * Prepared transaction protocol information for begin_prepare, and prepare.
+ */
+typedef struct LogicalRepPreparedTxnData
+{
+ XLogRecPtr prepare_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz prepare_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepPreparedTxnData;
+
+/*
+ * Prepared transaction protocol information for commit prepared.
+ */
+typedef struct LogicalRepCommitPreparedTxnData
+{
+ XLogRecPtr commit_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz commit_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepCommitPreparedTxnData;
+
+/*
+ * Rollback Prepared transaction protocol information. The prepare information
+ * prepare_end_lsn and prepare_time are used to check if the downstream has
+ * received this prepared transaction in which case it can apply the rollback,
+ * otherwise, it can skip the rollback operation. The gid alone is not
+ * sufficient because the downstream node can have a prepared transaction with
+ * same identifier.
+ */
+typedef struct LogicalRepRollbackPreparedTxnData
+{
+ XLogRecPtr prepare_end_lsn;
+ XLogRecPtr rollback_end_lsn;
+ TimestampTz prepare_time;
+ TimestampTz rollback_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepRollbackPreparedTxnData;
+
extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
extern void logicalrep_read_begin(StringInfo in,
LogicalRepBeginData *begin_data);
XLogRecPtr commit_lsn);
extern void logicalrep_read_commit(StringInfo in,
LogicalRepCommitData *commit_data);
+extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn);
+extern void logicalrep_read_begin_prepare(StringInfo in,
+ LogicalRepPreparedTxnData *begin_data);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+extern void logicalrep_read_prepare(StringInfo in,
+ LogicalRepPreparedTxnData *prepare_data);
+extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern void logicalrep_read_commit_prepared(StringInfo in,
+ LogicalRepCommitPreparedTxnData *prepare_data);
+extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+extern void logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data);
+
+
extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
bool binary;
bool streaming;
bool messages;
+ bool two_phase;
} PGOutputData;
#endif /* PGOUTPUT_H */
* Commit or Prepare time, only known when we read the actual commit or
* prepare record.
*/
- TimestampTz commit_time;
+ union
+ {
+ TimestampTz commit_time;
+ TimestampTz prepare_time;
+ } xact_time;
/*
* The base snapshot is used to decode all changes until either this
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- XLogRecPtr initial_consistent_point,
+ XLogRecPtr two_phase_at,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
XLogRecPtr confirmed_flush;
/*
- * LSN at which we found a consistent point at the time of slot creation.
- * This is also the point where we have exported a snapshot for the
- * initial copy.
+ * LSN at which we enabled two_phase commit for this slot or LSN at which
+ * we found a consistent point at the time of slot creation.
*/
- XLogRecPtr initial_consistent_point;
+ XLogRecPtr two_phase_at;
/*
* Allow decoding of prepared transactions?
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
bool need_full_snapshot,
- XLogRecPtr initial_consistent_point);
+ XLogRecPtr two_phase_at);
extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
-extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
+extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder);
+extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
+ bool twophase; /* Streaming of two-phase transactions at
+ * prepare time */
} logical;
} proto;
} WalRcvStreamOptions;
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname,
bool temporary,
+ bool two_phase,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
char *originname, int szorgname);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+extern bool AllTablesyncsReady(void);
+extern void UpdateTwoPhaseState(Oid suboid, char new_state);
+
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication already exists
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication used more then once
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
DROP FUNCTION func;
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+ERROR: two_phase requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+(1 row)
+
+--fail - alter of two_phase option not supported.
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+ERROR: unrecognized subscription parameter: "two_phase"
+--fail - cannot set streaming when two_phase enabled
+ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
+ERROR: cannot set streaming = true for two-phase enabled subscription
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail - two_phase and streaming are mutually exclusive.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+ERROR: two_phase = true and streaming = true are mutually exclusive options
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
+------+-------+---------+-------------+--------+-----------+------------------+--------------------+----------
+(0 rows)
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
DROP SUBSCRIPTION regress_testsub;
DROP FUNCTION func;
+-- fail - two_phase must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
+
+\dRs+
+--fail - alter of two_phase option not supported.
+ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
+
+--fail - cannot set streaming when two_phase enabled
+ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
+-- fail - two_phase and streaming are mutually exclusive.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+
+\dRs+
+
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 24;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full SELECT generate_series(1,10);
+ PREPARE TRANSACTION 'some_initial_data';
+ COMMIT PREPARED 'some_initial_data';");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE tab_full");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', "
+ CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub
+ WITH (two_phase = on)");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+ "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then COMMIT PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (11);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check that 2PC gets replicated to subscriber
+# then ROLLBACK PREPARED
+###############################
+
+$node_publisher->safe_psql('postgres',"
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that ROLLBACK PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ INSERT INTO tab_full VALUES (13);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(0), 'Rows rolled back are not on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher and subscriber crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ INSERT INTO tab_full VALUES (13);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (12,13);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (subscriber only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (14);
+ INSERT INTO tab_full VALUES (15);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (14,15);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Check that COMMIT PREPARED is decoded properly on crash restart
+# (publisher only crash)
+###############################
+
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (16);
+ INSERT INTO tab_full VALUES (17);
+ PREPARE TRANSACTION 'test_prepared_tab';");
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (16,17);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+###############################
+# Test nested transaction with 2PC
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (21);
+ SAVEPOINT sp_inner;
+ INSERT INTO tab_full VALUES (22);
+ ROLLBACK TO SAVEPOINT sp_inner;
+ PREPARE TRANSACTION 'outer';
+ ");
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# COMMIT
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check the transaction state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber');
+
+# check inserts are visible. 22 should be rolled back. 21 should be committed.
+$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are on the subscriber');
+
+###############################
+# Test using empty GID
+###############################
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (51);
+ PREPARE TRANSACTION '';");
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# ROLLBACK
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED '';");
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# copy_data=false and two_phase
+###############################
+
+#create some test tables for copy tests
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_copy SELECT generate_series(1,5);");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_copy (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres', "INSERT INTO tab_copy VALUES (88);");
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(1), 'initial data in subscriber table');
+
+# Setup logical replication
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_copy FOR TABLE tab_copy;");
+
+my $appname_copy = 'appname_copy';
+$node_subscriber->safe_psql('postgres', "
+ CREATE SUBSCRIPTION tap_sub_copy
+ CONNECTION '$publisher_connstr application_name=$appname_copy'
+ PUBLICATION tap_pub_copy
+ WITH (two_phase=on, copy_data=false);");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Also wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+# Check that the initial table data was NOT replicated (because we said copy_data=false)
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(1), 'initial data in subscriber table');
+
+# Now do a prepare on publisher and check that it IS replicated
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_copy VALUES (99);
+ PREPARE TRANSACTION 'mygid';");
+
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Check that the transaction has been prepared on the subscriber, there will be 2
+# prepared transactions for the 2 subscriptions.
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(2), 'transaction is prepared on subscriber');
+
+# Now commit the insert and verify that it IS replicated
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';");
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(6), 'publisher inserted data');
+
+$node_publisher->wait_for_catchup($appname_copy);
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(2), 'replicated data in subscriber table');
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test cascading logical replication of 2PC.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 27;
+
+###############################
+# Setup a cascade of pub/sub nodes.
+# node_A -> node_B -> node_C
+###############################
+
+# Initialize nodes
+# node_A
+my $node_A = get_new_node('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_A->start;
+# node_B
+my $node_B = get_new_node('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_B->start;
+# node_C
+my $node_C = get_new_node('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_C->start;
+
+# Create some pre-existing content on node_A
+$node_A->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_A->safe_psql('postgres', "
+ INSERT INTO tab_full SELECT generate_series(1,10);");
+
+# Create the same tables on node_B amd node_C
+$node_B->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_C->safe_psql('postgres',
+ "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres', "
+ CREATE SUBSCRIPTION tap_sub_B
+ CONNECTION '$node_A_connstr application_name=$appname_B'
+ PUBLICATION tap_pub_A
+ WITH (two_phase = on)");
+
+# node_B (pub) -> node_C (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_C = 'tap_sub_C';
+$node_C->safe_psql('postgres', "
+ CREATE SUBSCRIPTION tap_sub_C
+ CONNECTION '$node_B_connstr application_name=$appname_C'
+ PUBLICATION tap_pub_B
+ WITH (two_phase = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# Also wait for two-phase to be enabled
+my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_B->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+$node_C->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+is(1,1, "Cascade setup is complete");
+
+my $result;
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then COMMIT PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (11);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then ROLLBACK PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC ROLLBACK
+$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction is aborted on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+###############################
+# Test nested transactions with 2PC
+###############################
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (21);
+ SAVEPOINT sp_inner;
+ INSERT INTO tab_full VALUES (22);
+ ROLLBACK TO SAVEPOINT sp_inner;
+ PREPARE TRANSACTION 'outer';
+ ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# 22 should be rolled back.
+# 21 should be committed.
+$result = $node_B->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber C');
+
+###############################
+# check all the cleanup
+###############################
+
+# cleanup the node_B => node_C pub/sub
+$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node C');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node B');
+
+# cleanup the node_A => node_B pub/sub
+$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node B');
+$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node A');
+
+# shutdown
+$node_C->stop('fast');
+$node_B->stop('fast');
+$node_A->stop('fast');
LogicalOutputPluginWriterWrite
LogicalRepBeginData
LogicalRepCommitData
+LogicalRepCommitPreparedTxnData
LogicalRepCtxStruct
LogicalRepMsgType
LogicalRepPartMapEntry
+LogicalRepPreparedTxnData
LogicalRepRelId
LogicalRepRelMapEntry
LogicalRepRelation
+LogicalRepRollbackPreparedTxnData
LogicalRepTupleData
LogicalRepTyp
LogicalRepWorker