Add ALTER SUBSCRIPTION ... SKIP.
authorAmit Kapila <akapila@postgresql.org>
Tue, 22 Mar 2022 01:41:19 +0000 (07:11 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 22 Mar 2022 01:41:19 +0000 (07:11 +0530)
This feature allows skipping the transaction on subscriber nodes.

If incoming change violates any constraint, logical replication stops
until it's resolved. Currently, users need to either manually resolve the
conflict by updating a subscriber-side database or by using function
pg_replication_origin_advance() to skip the conflicting transaction. This
commit introduces a simpler way to skip the conflicting transactions.

The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX),
which allows the apply worker to skip the transaction finished at
specified LSN. The apply worker skips all data modification changes within
the transaction.

Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com

18 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/alter_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/parser/gram.y
src/backend/replication/logical/worker.c
src/bin/pg_dump/pg_dump.c
src/bin/psql/describe.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/include/nodes/parsenodes.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/029_disable_on_error.pl [deleted file]
src/test/subscription/t/029_on_error.pl [new file with mode: 0644]

index 4dc5b34d21c51e819f810e661cf8497c96779346..2a8cd0266491b84bba9e19db174e290dbee1bb61 100644 (file)
@@ -7797,6 +7797,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subskiplsn</structfield> <type>pg_lsn</type>
+      </para>
+      <para>
+       Finish LSN of the transaction whose changes are to be skipped, if a valid
+       LSN; otherwise <literal>0/0</literal>.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
index 6431d4796db3af5f71d6b0a13aff75ae360d2acc..555fbd749cc951fe16c6738570927a6443027470 100644 (file)
@@ -362,19 +362,24 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
 </screen>
    The LSN of the transaction that contains the change violating the constraint and
    the replication origin name can be found from the server log (LSN 0/14C0378 and
-   replication origin <literal>pg_16395</literal> in the above case).  To skip the
-   transaction, the subscription needs to be disabled temporarily by
-   <command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
+   replication origin <literal>pg_16395</literal> in the above case).  The
+   transaction that produces conflict can be skipped by using
+   <command>ALTER SUBSCRIPTION ... SKIP</command> with the finish LSN
+   (i.e., LSN 0/14C0378).  The finish LSN could be an LSN at which the transaction
+   is committed or prepared on the publisher.  Alternatively, the transaction can
+   also be skipped by calling the <link linkend="pg-replication-origin-advance">
+   <function>pg_replication_origin_advance()</function></link> function
+   transaction.  Before using this function, the subscription needs to be disabled
+   temporarily either by <command>ALTER SUBSCRIPTION ... DISABLE</command> or, the
    subscription can be used with the <literal>disable_on_error</literal> option.
-   Then, the transaction can be skipped by calling the
-   <link linkend="pg-replication-origin-advance">
-   <function>pg_replication_origin_advance()</function></link> function with
-   the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
-   next LSN of the transaction's LSN (i.e., LSN 0/14C0379).  After that the replication
-   can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>.  The current
-   position of origins can be seen in the
-   <link linkend="view-pg-replication-origin-status">
+   Then, you can use <function>pg_replication_origin_advance()</function> function
+   with the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>)
+   and the next LSN of the finish LSN (i.e., 0/14C0379).  The current position of
+   origins can be seen in the <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
+   Please note that skipping the whole transaction include skipping changes that
+   might not violate any constraint.  This can easily make the subscriber
+   inconsistent.
   </para>
  </sect1>
 
index 58b78a94eabe29eae586938a50ed2e2fe7e75ce7..ac2db249cbbc027802b1890b42435bf93270cb1b 100644 (file)
@@ -29,6 +29,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUB
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 </synopsis>
@@ -210,6 +211,47 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SKIP ( <replaceable class="parameter">skip_option</replaceable> = <replaceable class="parameter">value</replaceable> )</literal></term>
+    <listitem>
+     <para>
+      Skips applying all changes of the remote transaction.  If incoming data
+      violates any constraints, logical replication will stop until it is
+      resolved.  By using <command>ALTER SUBSCRIPTION ... SKIP</command> command,
+      the logical replication worker skips all data modification changes within
+      the transaction.  This option has no effect on the transactions that are
+      already prepared by enabling <literal>two_phase</literal> on
+      subscriber.
+      After logical replication worker successfully skips the transaction or
+      finishes a transaction, LSN (stored in
+      <structname>pg_subscription</structname>.<structfield>subskiplsn</structfield>)
+      is cleared.  See <xref linkend="logical-replication-conflicts"/> for
+      the details of logical replication conflicts.  Using this command requires
+      superuser privilege.
+     </para>
+
+     <para>
+      <replaceable>skip_option</replaceable> specifies options for this operation.
+      The supported option is:
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>lsn</literal> (<type>pg_lsn</type>)</term>
+        <listitem>
+         <para>
+          Specifies the finish LSN of the remote transaction whose changes
+          are to be skipped by the logical replication worker.  The finish LSN
+          is the LSN at which the transaction is either committed or prepared.
+          Skipping individual subtransaction is not supported.  Setting
+          <literal>NONE</literal> resets the LSN.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="parameter">new_owner</replaceable></term>
     <listitem>
index a6304f5f81a22c59f451769bfc09e2167eba4210..0ff0982f7b2ee9d6c195934eba17fed404848bca 100644 (file)
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
        sub->stream = subform->substream;
        sub->twophasestate = subform->subtwophasestate;
        sub->disableonerr = subform->subdisableonerr;
+       sub->skiplsn = subform->subskiplsn;
 
        /* Get conninfo */
        datum = SysCacheGetAttr(SUBSCRIPTIONOID,
index bb1ac30cd192623590bc01d292c5f0693421186e..bd48ee7bd255e649ddaa2696c8dcd6fd844776f8 100644 (file)
@@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subdisableonerr, subslotname,
+              substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
               subsynccommit, subpublications)
     ON pg_subscription TO public;
 
index 3922658bbcae1730797164319345a2261a5c3c68..e16f04626dec60b2cb2c3156ef5d9625b6893599 100644 (file)
@@ -45,6 +45,7 @@
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/syscache.h"
 
 /*
@@ -62,6 +63,7 @@
 #define SUBOPT_STREAMING                       0x00000100
 #define SUBOPT_TWOPHASE_COMMIT         0x00000200
 #define SUBOPT_DISABLE_ON_ERR          0x00000400
+#define SUBOPT_LSN                                     0x00000800
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
        bool            streaming;
        bool            twophase;
        bool            disableonerr;
+       XLogRecPtr      lsn;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
                        opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
                        opts->disableonerr = defGetBoolean(defel);
                }
+               else if (IsSet(supported_opts, SUBOPT_LSN) &&
+                                strcmp(defel->defname, "lsn") == 0)
+               {
+                       char       *lsn_str = defGetString(defel);
+                       XLogRecPtr      lsn;
+
+                       if (IsSet(opts->specified_opts, SUBOPT_LSN))
+                               errorConflictingDefElem(defel, pstate);
+
+                       /* Setting lsn = NONE is treated as resetting LSN */
+                       if (strcmp(lsn_str, "none") == 0)
+                               lsn = InvalidXLogRecPtr;
+                       else
+                       {
+                               /* Parse the argument as LSN */
+                               lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+                                                                                                         CStringGetDatum(lsn_str)));
+
+                               if (XLogRecPtrIsInvalid(lsn))
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                        errmsg("invalid WAL location (LSN): %s", lsn_str)));
+                       }
+
+                       opts->specified_opts |= SUBOPT_LSN;
+                       opts->lsn = lsn;
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                                         LOGICALREP_TWOPHASE_STATE_PENDING :
                                         LOGICALREP_TWOPHASE_STATE_DISABLED);
        values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+       values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
        values[Anum_pg_subscription_subconninfo - 1] =
                CStringGetTextDatum(conninfo);
        if (opts.slot_name)
@@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                                break;
                        }
 
+               case ALTER_SUBSCRIPTION_SKIP:
+                       {
+                               parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
+
+                               /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
+                               Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
+
+                               if (!superuser())
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                                        errmsg("must be superuser to skip transaction")));
+
+                               /*
+                                * If the user sets subskiplsn, we do a sanity check to make
+                                * sure that the specified LSN is a probable value.
+                                */
+                               if (!XLogRecPtrIsInvalid(opts.lsn))
+                               {
+                                       RepOriginId originid;
+                                       char            originname[NAMEDATALEN];
+                                       XLogRecPtr      remote_lsn;
+
+                                       snprintf(originname, sizeof(originname), "pg_%u", subid);
+                                       originid = replorigin_by_name(originname, false);
+                                       remote_lsn = replorigin_get_progress(originid, false);
+
+                                       /* Check the given LSN is at least a future LSN */
+                                       if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                                errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
+                                                                               LSN_FORMAT_ARGS(opts.lsn),
+                                                                               LSN_FORMAT_ARGS(remote_lsn))));
+                               }
+
+                               values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
+                               replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+                               update_tuple = true;
+                               break;
+                       }
+
                default:
                        elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
                                 stmt->kind);
index a03b33b53bdcefe5bd7a2d5c92888e7b95450fd5..0036c2f9e2d6ade3040ddf4844784bec362ade96 100644 (file)
@@ -9983,6 +9983,15 @@ AlterSubscriptionStmt:
                                                                                        (Node *)makeBoolean(false), @1));
                                        $$ = (Node *)n;
                                }
+                       | ALTER SUBSCRIPTION name SKIP definition
+                               {
+                                       AlterSubscriptionStmt *n =
+                                               makeNode(AlterSubscriptionStmt);
+                                       n->kind = ALTER_SUBSCRIPTION_SKIP;
+                                       n->subname = $3;
+                                       n->options = $5;
+                                       $$ = (Node *)n;
+                               }
                ;
 
 /*****************************************************************************
index 03e069c7cdd5744b3cced6234376f9b1e7137056..82dcffc2db8b640565d3abd522dcc3c38e3d3e64 100644 (file)
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/syscache.h"
@@ -259,6 +261,21 @@ static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
+/*
+ * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
+ * the subscription if the remote transaction's finish LSN matches the subskiplsn.
+ * Once we start skipping changes, we don't stop it until we skip all changes of
+ * the transaction even if pg_subscription is updated and MySubscription->skiplsn
+ * gets changed or reset during that. Also, in streaming transaction cases, we
+ * don't skip receiving and spooling the changes since we decide whether or not
+ * to skip applying the changes when starting to apply changes. The subskiplsn is
+ * cleared after successfully skipping the transaction or applying non-empty
+ * transaction. The latter prevents the mistakenly specified subskiplsn from
+ * being left.
+ */
+static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
+#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int
 /* Common streaming function to apply all the spooled messages */
 static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
+/* Functions for skipping changes */
+static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
+static void stop_skipping_changes(void);
+static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
+
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
@@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s)
 
        remote_final_lsn = begin_data.final_lsn;
 
+       maybe_start_skipping_changes(begin_data.final_lsn);
+
        in_remote_transaction = true;
 
        pgstat_report_activity(STATE_RUNNING, NULL);
@@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s)
 
        remote_final_lsn = begin_data.prepare_lsn;
 
+       maybe_start_skipping_changes(begin_data.prepare_lsn);
+
        in_remote_transaction = true;
 
        pgstat_report_activity(STATE_RUNNING, NULL);
@@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s)
 
        /*
         * Unlike commit, here, we always prepare the transaction even though no
-        * change has happened in this transaction. It is done this way because at
-        * commit prepared time, we won't know whether we have skipped preparing a
-        * transaction because of no change.
+        * change has happened in this transaction or all changes are skipped. It
+        * is done this way because at commit prepared time, we won't know whether
+        * we have skipped preparing a transaction because of those reasons.
         *
         * XXX, We can optimize such that at commit prepared time, we first check
         * whether we have prepared the transaction or not but that doesn't seem
@@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s)
        /* Process any tables that are being synchronized in parallel. */
        process_syncing_tables(prepare_data.end_lsn);
 
+       /*
+        * Since we have already prepared the transaction, in a case where the
+        * server crashes before clearing the subskiplsn, it will be left but the
+        * transaction won't be resent. But that's okay because it's a rare case
+        * and the subskiplsn will be cleared when finishing the next transaction.
+        */
+       stop_skipping_changes();
+       clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
        pgstat_report_activity(STATE_IDLE, NULL);
        reset_apply_error_context_info();
 }
@@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s)
        /* Process any tables that are being synchronized in parallel. */
        process_syncing_tables(prepare_data.end_lsn);
 
+       clear_subscription_skip_lsn(prepare_data.end_lsn);
+
        pgstat_report_activity(STATE_IDLE, NULL);
        reset_apply_error_context_info();
 }
@@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s)
                FinishPreparedTransaction(gid, false);
                end_replication_step();
                CommitTransactionCommand();
+
+               clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
        }
 
        pgstat_report_stat(false);
@@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s)
        /* Process any tables that are being synchronized in parallel. */
        process_syncing_tables(prepare_data.end_lsn);
 
+       /*
+        * Similar to prepare case, the subskiplsn could be left in a case of
+        * server crash but it's okay. See the comments in apply_handle_prepare().
+        */
+       stop_skipping_changes();
+       clear_subscription_skip_lsn(prepare_data.prepare_lsn);
+
        pgstat_report_activity(STATE_IDLE, NULL);
 
        reset_apply_error_context_info();
@@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
        MemoryContext oldcxt;
        BufFile    *fd;
 
+       maybe_start_skipping_changes(lsn);
+
        /* Make sure we have an open transaction */
        begin_replication_step();
 
@@ -1455,8 +1503,26 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(LogicalRepCommitData *commit_data)
 {
+       if (is_skipping_changes())
+       {
+               stop_skipping_changes();
+
+               /*
+                * Start a new transaction to clear the subskiplsn, if not started
+                * yet.
+                */
+               if (!IsTransactionState())
+                       StartTransactionCommand();
+       }
+
        if (IsTransactionState())
        {
+               /*
+                * The transaction is either non-empty or skipped, so we clear the
+                * subskiplsn.
+                */
+               clear_subscription_skip_lsn(commit_data->commit_lsn);
+
                /*
                 * Update origin state so we can restart streaming from correct
                 * position in case of crash.
@@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s)
        TupleTableSlot *remoteslot;
        MemoryContext oldctx;
 
-       if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+       /*
+        * Quick return if we are skipping data modification changes or handling
+        * streamed transactions.
+        */
+       if (is_skipping_changes() ||
+               handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
                return;
 
        begin_replication_step();
@@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s)
        RangeTblEntry *target_rte;
        MemoryContext oldctx;
 
-       if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+       /*
+        * Quick return if we are skipping data modification changes or handling
+        * streamed transactions.
+        */
+       if (is_skipping_changes() ||
+               handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
                return;
 
        begin_replication_step();
@@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s)
        TupleTableSlot *remoteslot;
        MemoryContext oldctx;
 
-       if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+       /*
+        * Quick return if we are skipping data modification changes or handling
+        * streamed transactions.
+        */
+       if (is_skipping_changes() ||
+               handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
                return;
 
        begin_replication_step();
@@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s)
        ListCell   *lc;
        LOCKMODE        lockmode = AccessExclusiveLock;
 
-       if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+       /*
+        * Quick return if we are skipping data modification changes or handling
+        * streamed transactions.
+        */
+       if (is_skipping_changes() ||
+               handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
                return;
 
        begin_replication_step();
@@ -3738,6 +3824,139 @@ IsLogicalWorker(void)
        return MyLogicalRepWorker != NULL;
 }
 
+/*
+ * Start skipping changes of the transaction if the given LSN matches the
+ * LSN specified by subscription's skiplsn.
+ */
+static void
+maybe_start_skipping_changes(XLogRecPtr finish_lsn)
+{
+       Assert(!is_skipping_changes());
+       Assert(!in_remote_transaction);
+       Assert(!in_streamed_transaction);
+
+       /*
+        * Quick return if it's not requested to skip this transaction. This
+        * function is called for every remote transaction and we assume that
+        * skipping the transaction is not used often.
+        */
+       if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+                          MySubscription->skiplsn != finish_lsn))
+               return;
+
+       /* Start skipping all changes of this transaction */
+       skip_xact_finish_lsn = finish_lsn;
+
+       ereport(LOG,
+                       errmsg("start skipping logical replication transaction finished at %X/%X",
+                                  LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
+}
+
+/*
+ * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
+ */
+static void
+stop_skipping_changes(void)
+{
+       if (!is_skipping_changes())
+               return;
+
+       ereport(LOG,
+                       (errmsg("done skipping logical replication transaction finished at %X/%X",
+                                       LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+
+       /* Stop skipping changes */
+       skip_xact_finish_lsn = InvalidXLogRecPtr;
+}
+
+/*
+ * Clear subskiplsn of pg_subscription catalog.
+ *
+ * finish_lsn is the transaction's finish LSN that is used to check if the
+ * subskiplsn matches it. If not matched, we raise a warning when clearing the
+ * subskiplsn in order to inform users for cases e.g., where the user mistakenly
+ * specified the wrong subskiplsn.
+ */
+static void
+clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
+{
+       Relation        rel;
+       Form_pg_subscription subform;
+       HeapTuple       tup;
+       XLogRecPtr      myskiplsn = MySubscription->skiplsn;
+       bool            started_tx = false;
+
+       if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+               return;
+
+       if (!IsTransactionState())
+       {
+               StartTransactionCommand();
+               started_tx = true;
+       }
+
+       /*
+        * Protect subskiplsn of pg_subscription from being concurrently updated
+        * while clearing it.
+        */
+       LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+                                        AccessShareLock);
+
+       rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+       /* Fetch the existing tuple. */
+       tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
+                                                         ObjectIdGetDatum(MySubscription->oid));
+
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
+
+       subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+       /*
+        * Clear the subskiplsn. If the user has already changed subskiplsn before
+        * clearing it we don't update the catalog and the replication origin
+        * state won't get advanced. So in the worst case, if the server crashes
+        * before sending an acknowledgment of the flush position the transaction
+        * will be sent again and the user needs to set subskiplsn again. We can
+        * reduce the possibility by logging a replication origin WAL record to
+        * advance the origin LSN instead but there is no way to advance the
+        * origin timestamp and it doesn't seem to be worth doing anything about
+        * it since it's a very rare case.
+        */
+       if (subform->subskiplsn == myskiplsn)
+       {
+               bool            nulls[Natts_pg_subscription];
+               bool            replaces[Natts_pg_subscription];
+               Datum           values[Natts_pg_subscription];
+
+               memset(values, 0, sizeof(values));
+               memset(nulls, false, sizeof(nulls));
+               memset(replaces, false, sizeof(replaces));
+
+               /* reset subskiplsn */
+               values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
+               replaces[Anum_pg_subscription_subskiplsn - 1] = true;
+
+               tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+                                                               replaces);
+               CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+               if (myskiplsn != finish_lsn)
+                       ereport(WARNING,
+                                       errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name),
+                                       errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X",
+                                                         LSN_FORMAT_ARGS(finish_lsn),
+                                                         LSN_FORMAT_ARGS(myskiplsn)));
+       }
+
+       heap_freetuple(tup);
+       table_close(rel, NoLock);
+
+       if (started_tx)
+               CommitTransactionCommand();
+}
+
 /* Error callback to give more context info about the change being applied */
 static void
 apply_error_callback(void *arg)
index 725cd2e4ebcafc68bdb9b0f601622f03e7050f40..e5816c4ccea61083df2b0a7e165b601f27199b3d 100644 (file)
@@ -4385,6 +4385,10 @@ getSubscriptions(Archive *fout)
 
        ntups = PQntuples(res);
 
+       /*
+        * Get subscription fields. We don't include subskiplsn in the dump as
+        * after restoring the dump this value may no longer be relevant.
+        */
        i_tableoid = PQfnumber(res, "tableoid");
        i_oid = PQfnumber(res, "oid");
        i_subname = PQfnumber(res, "subname");
index 991bfc1546b063f2fae3d29c0bcade9d0cf06124..714097cad1bcbb57bedb570c86589cb8d029b36b 100644 (file)
@@ -6105,7 +6105,7 @@ describeSubscriptions(const char *pattern, bool verbose)
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
        static const bool translate_columns[] = {false, false, false, false,
-       false, false, false, false, false, false};
+       false, false, false, false, false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -6152,6 +6152,12 @@ describeSubscriptions(const char *pattern, bool verbose)
                                                  ",  subconninfo AS \"%s\"\n",
                                                  gettext_noop("Synchronous commit"),
                                                  gettext_noop("Conninfo"));
+
+               /* Skip LSN is only supported in v15 and higher */
+               if (pset.sversion >= 150000)
+                       appendPQExpBuffer(&buf,
+                                                         ", subskiplsn AS \"%s\"\n",
+                                                         gettext_noop("Skip LSN"));
        }
 
        /* Only display subscriptions in current database. */
index 183abcc2753108224c735cac2405454508de9a89..5c064595a97d4a5dd39ed0c894579c808a488fd1 100644 (file)
@@ -1852,7 +1852,7 @@ psql_completion(const char *text, int start, int end)
        /* ALTER SUBSCRIPTION <name> */
        else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
                COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
-                                         "RENAME TO", "REFRESH PUBLICATION", "SET",
+                                         "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
                                          "ADD PUBLICATION", "DROP PUBLICATION");
        /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
        else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
@@ -1868,6 +1868,9 @@ psql_completion(const char *text, int start, int end)
        /* ALTER SUBSCRIPTION <name> SET ( */
        else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
                COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
+       /* ALTER SUBSCRIPTION <name> SKIP ( */
+       else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
+               COMPLETE_WITH("lsn");
        /* ALTER SUBSCRIPTION <name> SET PUBLICATION */
        else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
        {
index 1383761c1f8ce1069ed5b8d0a74977900bcd3335..db9963db72764c292de6a627759cdb28df00aafe 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202203211
+#define CATALOG_VERSION_NO     202203221
 
 #endif
index e2befaf351232d1c21f19b21adeacbee3ff377a2..69969a0617e2d04e08706b84c3cca4f2507866f8 100644 (file)
@@ -70,6 +70,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
        bool            subdisableonerr;        /* True if a worker error should cause the
                                                                         * subscription to be disabled */
 
+       XLogRecPtr      subskiplsn;             /* All changes finished at this LSN are
+                                                                * skipped */
+
 #ifdef CATALOG_VARLEN                  /* variable-length fields start here */
        /* Connection string to the publisher */
        text            subconninfo BKI_FORCE_NOT_NULL;
@@ -109,6 +112,8 @@ typedef struct Subscription
        bool            disableonerr;   /* Indicates if the subscription should be
                                                                 * automatically disabled if a worker error
                                                                 * occurs */
+       XLogRecPtr      skiplsn;                /* All changes finished at this LSN are
+                                                                * skipped */
        char       *conninfo;           /* Connection string to the publisher */
        char       *slotname;           /* Name of the replication slot */
        char       *synccommit;         /* Synchronous commit setting for worker */
index 1617702d9d681120328791411f4112efadfdc775..6f83a79a96c3d4ca7c2ba93206334c307cd8bf08 100644 (file)
@@ -3726,7 +3726,8 @@ typedef enum AlterSubscriptionType
        ALTER_SUBSCRIPTION_ADD_PUBLICATION,
        ALTER_SUBSCRIPTION_DROP_PUBLICATION,
        ALTER_SUBSCRIPTION_REFRESH,
-       ALTER_SUBSCRIPTION_ENABLED
+       ALTER_SUBSCRIPTION_ENABLED,
+       ALTER_SUBSCRIPTION_SKIP
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
index ad8003fae12e3c038b41d246d41e23324c4d4abf..7fcfad15916330b862ce33f9e74f9bc89962d370 100644 (file)
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
 ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+\dRs+
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/12345
+(1 row)
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                   List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
+                                                                                         List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                     List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
+                                                                                           List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +179,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +202,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +247,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +284,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +296,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -309,18 +323,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
+                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
index a7c15b1dafcc2ed77b6e31dc225e557adcc921db..74c38ead5d6d729a2ec3814631f2b44d870d6ef6 100644 (file)
@@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
 ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+
+\dRs+
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+
 \dRs+
 
 BEGIN;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
deleted file mode 100644 (file)
index 5eca804..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-
-# Copyright (c) 2021-2022, PostgreSQL Global Development Group
-
-# Test of logical replication subscription self-disabling feature.
-use strict;
-use warnings;
-use PostgreSQL::Test::Cluster;
-use PostgreSQL::Test::Utils;
-use Test::More;
-
-# create publisher node
-my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
-$node_publisher->init(allows_streaming => 'logical');
-$node_publisher->start;
-
-# create subscriber node
-my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init;
-$node_subscriber->start;
-
-# Create identical table on both nodes.
-$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-
-# Insert duplicate values on the publisher.
-$node_publisher->safe_psql('postgres',
-       "INSERT INTO tbl (i) VALUES (1), (1), (1)");
-
-# Create an additional unique index on the subscriber.
-$node_subscriber->safe_psql('postgres',
-       "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Create a pub/sub to set up logical replication. This tests that the
-# uniqueness violation will cause the subscription to fail during initial
-# synchronization and make it disabled.
-my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-$node_publisher->safe_psql('postgres',
-       "CREATE PUBLICATION pub FOR TABLE tbl");
-$node_subscriber->safe_psql('postgres',
-       "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
-);
-
-# Initial synchronization failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
-       "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscriber to be disabled";
-
-# Drop the unique index on the subscriber which caused the subscription to be
-# disabled.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-
-# Re-enable the subscription "sub".
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-# Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
-       "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
-
-# Confirm that we have finished the table sync.
-my $result =
-  $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
-is($result, qq(1|3), "subscription sub replicated data");
-
-# Delete the data from the subscriber and recreate the unique index.
-$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
-$node_subscriber->safe_psql('postgres',
-       "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Add more non-unique data to the publisher.
-$node_publisher->safe_psql('postgres',
-       "INSERT INTO tbl (i) VALUES (3), (3), (3)");
-
-# Apply failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
-       "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscription sub to be disabled";
-
-# Drop the unique index on the subscriber and re-enabled the subscription. Then
-# confirm that the previously failing insert was applied OK.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-$node_publisher->wait_for_catchup('sub');
-
-$result = $node_subscriber->safe_psql('postgres',
-       "SELECT COUNT(*) FROM tbl WHERE i = 3");
-is($result, qq(3), 'check the result of apply');
-
-$node_subscriber->stop;
-$node_publisher->stop;
-
-done_testing();
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
new file mode 100644 (file)
index 0000000..e8b904b
--- /dev/null
@@ -0,0 +1,183 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Tests for disable_on_error and SKIP transaction features.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $offset = 0;
+
+# Test skipping the transaction. This function must be called after the caller
+# has inserted data that conflicts with the subscriber.  The finish LSN of the
+# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
+# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we
+# check if logical replication can continue working by inserting $nonconflict_data
+# on the publisher.
+sub test_skip_lsn
+{
+       my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
+         = @_;
+
+       # Wait until a conflict occurs on the subscriber.
+       $node_subscriber->poll_query_until('postgres',
+               "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'"
+       );
+
+       # Get the finish LSN of the error transaction.
+       my $contents = slurp_file($node_subscriber->logfile, $offset);
+       $contents =~
+         qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.tbl" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/
+         or die "could not get error-LSN";
+       my $lsn = $1;
+
+       # Set skip lsn.
+       $node_subscriber->safe_psql('postgres',
+               "ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')");
+
+       # Re-enable the subscription.
+       $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+       # Wait for the failed transaction to be skipped
+       $node_subscriber->poll_query_until('postgres',
+               "SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'"
+       );
+
+       # Check the log to ensure that the transaction is skipped, and advance the
+       # offset of the log file for the next test.
+       $offset = $node_subscriber->wait_for_log(
+               qr/LOG:  done skipping logical replication transaction finished at $lsn/,
+               $offset);
+
+       # Insert non-conflict data
+       $node_publisher->safe_psql('postgres',
+               "INSERT INTO tbl VALUES $nonconflict_data");
+
+       $node_publisher->wait_for_catchup('sub');
+
+       # Check replicated data
+       my $res =
+         $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+       is($res, $expected, $msg);
+}
+
+# Create publisher node. Set a low value of logical_decoding_work_mem to test
+# streaming cases.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+       'postgresql.conf',
+       qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf(
+       'postgresql.conf',
+       qq[
+max_prepared_transactions = 10
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On the subscriber, we
+# create the same tables but with a primary key. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+       'postgres',
+       qq[
+CREATE TABLE tbl (i INT, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+$node_subscriber->safe_psql(
+       'postgres',
+       qq[
+CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+       "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Truncate the table on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+       "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl");
+is($result, qq(1), "subscription sub replicated data");
+
+# Insert data to tbl, raising an error on the subscriber due to violation
+# of the unique constraint on tbl. Then skip the transaction.
+$node_publisher->safe_psql(
+       'postgres',
+       qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+       "(2, NULL)", "2", "test skipping transaction");
+
+# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
+# PREPARE the transaction, raising an error. Then skip the transaction.
+$node_publisher->safe_psql(
+       'postgres',
+       qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+       "(3, NULL)", "3", "test skipping prepare and commit prepared ");
+
+# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled
+# changes for the same reason. Then skip the transaction.
+$node_publisher->safe_psql(
+       'postgres',
+       qq[
+BEGIN;
+INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))",
+       "4", "test skipping stream-commit");
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT COUNT(*) FROM pg_prepared_xacts");
+is($result, "0",
+       "check all prepared transactions are resolved on the subscriber");
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();