<para>
The resolution can be done either by changing data or permissions on the subscriber so
that it does not conflict with the incoming change or by skipping the
- transaction that conflicts with the existing data. The transaction can be
- skipped by calling the <link linkend="pg-replication-origin-advance">
+ transaction that conflicts with the existing data. When a conflict produces
+ an error, the replication won't proceed, and the logical replication worker will
+ emit the following kind of message to the subscriber's server log:
+<screen>
+ERROR: duplicate key value violates unique constraint "test_pkey"
+DETAIL: Key (c)=(1) already exists.
+CONTEXT: processing remote data for replication origin "pg_16395" during "INSERT" for replication target relation "public.test" in transaction 725 finished at 0/14C0378
+</screen>
+ The LSN of the transaction that contains the change violating the constraint and
+ the replication origin name can be found from the server log (LSN 0/14C0378 and
+ replication origin <literal>pg_16395</literal> in the above case). To skip the
+ transaction, the subscription needs to be disabled temporarily by
+ <command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
+ can be skipped by calling the
+ <link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
- a <parameter>node_name</parameter> corresponding to the subscription name,
- and a position. The current position of origins can be seen in the
+ the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
+ next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication
+ can be resumed by <command>ALTER SUBSCRIPTION ... ENABLE</command>. The current
+ position of origins can be seen in the
<link linkend="view-pg-replication-origin-status">
<structname>pg_replication_origin_status</structname></link> system view.
</para>
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr finish_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .finish_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
if (apply_error_callback_arg.command == 0)
return;
+ Assert(errarg->origin_name);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.finish_lsn = lsn;
}
/* Reset all information of apply error callback */
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}