Add the additional information to the logical replication worker errcontext.
authorAmit Kapila <akapila@postgresql.org>
Tue, 8 Mar 2022 02:38:32 +0000 (08:08 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 8 Mar 2022 02:38:32 +0000 (08:08 +0530)
This commits adds both the finish LSN (commit_lsn in case transaction got
committed, prepare_lsn in case of a prepared transaction, etc.) and
replication origin name to the existing error context message.

This will help users in specifying the origin name and transaction finish
LSN to pg_replication_origin_advance() SQL function to skip a particular
transaction.

Author: Masahiko Sawada
Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila
Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com

doc/src/sgml/logical-replication.sgml
src/backend/replication/logical/worker.c

index fb4472356d5b329177d648383347075f9288c1d3..82326c3901934d6d3f3f4aecddf87dfbb7da7f9d 100644 (file)
   <para>
    The resolution can be done either by changing data or permissions on the subscriber so
    that it does not conflict with the incoming change or by skipping the
-   transaction that conflicts with the existing data.  The transaction can be
-   skipped by calling the <link linkend="pg-replication-origin-advance">
+   transaction that conflicts with the existing data.  When a conflict produces
+   an error, the replication won't proceed, and the logical replication worker will
+   emit the following kind of message to the subscriber's server log:
+<screen>
+ERROR:  duplicate key value violates unique constraint "test_pkey"
+DETAIL:  Key (c)=(1) already exists.
+CONTEXT:  processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+</screen>
+   The LSN of the transaction that contains the change violating the constraint and
+   the replication origin name can be found from the server log (LSN 0/14C0378 and
+   replication origin <literal>pg_16395</literal> in the above case).  To skip the
+   transaction, the subscription needs to be disabled temporarily by
+   <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+   can be skipped by calling the
+   <link linkend="pg-replication-origin-advance">
    <function>pg_replication_origin_advance()</function></link> function with
-   a <parameter>node_name</parameter> corresponding to the subscription name,
-   and a position.  The current position of origins can be seen in the
+   the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+   next LSN of the transaction's LSN (i.e., LSN 0/14C0379).  After that the replication
+   can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>.  The current
+   position of origins can be seen in the
    <link linkend="view-pg-replication-origin-status">
    <structname>pg_replication_origin_status</structname></link> system view.
   </para>
index 92aa794706dfc611d9718af0c1ad830b7f0b2fc2..8653e1d8402a7f918d74daf154761ab575794281 100644 (file)
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
    /* Remote node information */
    int         remote_attnum;  /* -1 if invalid */
    TransactionId remote_xid;
+   XLogRecPtr  finish_lsn;
+   char       *origin_name;
 } ApplyErrorCallbackArg;
 
 static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
    .rel = NULL,
    .remote_attnum = -1,
    .remote_xid = InvalidTransactionId,
+   .finish_lsn = InvalidXLogRecPtr,
+   .origin_name = NULL,
 };
 
 static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /* Functions for apply error callback */
 static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
 static inline void reset_apply_error_context_info(void);
 
 /*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
    LogicalRepBeginData begin_data;
 
    logicalrep_read_begin(s, &begin_data);
-   set_apply_error_context_xact(begin_data.xid);
+   set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
    remote_final_lsn = begin_data.final_lsn;
 
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
                 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
    logicalrep_read_begin_prepare(s, &begin_data);
-   set_apply_error_context_xact(begin_data.xid);
+   set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
    remote_final_lsn = begin_data.prepare_lsn;
 
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
    char        gid[GIDSIZE];
 
    logicalrep_read_commit_prepared(s, &prepare_data);
-   set_apply_error_context_xact(prepare_data.xid);
+   set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
    /* Compute GID for two_phase transactions. */
    TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
    char        gid[GIDSIZE];
 
    logicalrep_read_rollback_prepared(s, &rollback_data);
-   set_apply_error_context_xact(rollback_data.xid);
+   set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
    /* Compute GID for two_phase transactions. */
    TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
                 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
 
    logicalrep_read_stream_prepare(s, &prepare_data);
-   set_apply_error_context_xact(prepare_data.xid);
+   set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
 
    elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
 
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
-   set_apply_error_context_xact(stream_xid);
+   set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
 
    /*
     * Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
     */
    if (xid == subxid)
    {
-       set_apply_error_context_xact(xid);
+       set_apply_error_context_xact(xid, InvalidXLogRecPtr);
        stream_cleanup_files(MyLogicalRepWorker->subid, xid);
    }
    else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
        bool        found = false;
        char        path[MAXPGPATH];
 
-       set_apply_error_context_xact(subxid);
+       set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
 
        subidx = -1;
        begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
                 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
    xid = logicalrep_read_stream_commit(s, &commit_data);
-   set_apply_error_context_xact(xid);
+   set_apply_error_context_xact(xid, commit_data.commit_lsn);
 
    elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
        myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
        pfree(syncslotname);
+
+       /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+       ReplicationOriginNameForTablesync(MySubscription->oid,
+                                         MyLogicalRepWorker->relid,
+                                         originname,
+                                         sizeof(originname));
+       apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                  originname);
    }
    else
    {
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
         * does some initializations on the upstream so let's still call it.
         */
        (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+       /*
+        * Allocate the origin name in long-lived context for error context
+        * message.
+        */
+       apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+                                                                  originname);
    }
 
    /*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
    if (apply_error_callback_arg.command == 0)
        return;
 
+   Assert(errarg->origin_name);
+
    if (errarg->rel == NULL)
    {
        if (!TransactionIdIsValid(errarg->remote_xid))
-           errcontext("processing remote data during \"%s\"",
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+                      errarg->origin_name,
                       logicalrep_message_type(errarg->command));
-       else
-           errcontext("processing remote data during \"%s\" in transaction %u",
+       else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+                      errarg->origin_name,
                       logicalrep_message_type(errarg->command),
                       errarg->remote_xid);
+       else
+           errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+                      errarg->origin_name,
+                      logicalrep_message_type(errarg->command),
+                      errarg->remote_xid,
+                      LSN_FORMAT_ARGS(errarg->finish_lsn));
    }
    else if (errarg->remote_attnum < 0)
-       errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+       errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+                  errarg->origin_name,
                   logicalrep_message_type(errarg->command),
                   errarg->rel->remoterel.nspname,
                   errarg->rel->remoterel.relname,
-                  errarg->remote_xid);
+                  errarg->remote_xid,
+                  LSN_FORMAT_ARGS(errarg->finish_lsn));
    else
-       errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+       errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+                  errarg->origin_name,
                   logicalrep_message_type(errarg->command),
                   errarg->rel->remoterel.nspname,
                   errarg->rel->remoterel.relname,
                   errarg->rel->remoterel.attnames[errarg->remote_attnum],
-                  errarg->remote_xid);
+                  errarg->remote_xid,
+                  LSN_FORMAT_ARGS(errarg->finish_lsn));
 }
 
 /* Set transaction information of apply error callback */
 static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
 {
    apply_error_callback_arg.remote_xid = xid;
+   apply_error_callback_arg.finish_lsn = lsn;
 }
 
 /* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
    apply_error_callback_arg.command = 0;
    apply_error_callback_arg.rel = NULL;
    apply_error_callback_arg.remote_attnum = -1;
-   set_apply_error_context_xact(InvalidTransactionId);
+   set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }