static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
-typedef struct SlotErrCallbackArg
-{
- LogicalRepRelMapEntry *rel;
- int remote_attnum;
-} SlotErrCallbackArg;
-
typedef struct ApplyExecutionData
{
EState *estate; /* executor state, used to track resources */
PartitionTupleRouting *proute; /* partition routing info */
} ApplyExecutionData;
+/* Struct for saving and restoring apply errcontext information */
+typedef struct ApplyErrorCallbackArg
+{
+ LogicalRepMsgType command; /* 0 if invalid */
+ LogicalRepRelMapEntry *rel;
+
+ /* Remote node information */
+ int remote_attnum; /* -1 if invalid */
+ TransactionId remote_xid;
+ TimestampTz ts; /* commit, rollback, or prepare timestamp */
+} ApplyErrorCallbackArg;
+
+static ApplyErrorCallbackArg apply_error_callback_arg =
+{
+ .command = 0,
+ .rel = NULL,
+ .remote_attnum = -1,
+ .remote_xid = InvalidTransactionId,
+ .ts = 0,
+};
+
/*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle.
/* Common streaming function to apply all the spooled messages */
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
+/* Functions for apply error callback */
+static void apply_error_callback(void *arg);
+static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void reset_apply_error_context_info(void);
+
/*
* Should this worker apply changes for given relation.
*
ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
}
-/*
- * Error callback to give more context info about data conversion failures
- * while reading data from the remote server.
- */
-static void
-slot_store_error_callback(void *arg)
-{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
- LogicalRepRelMapEntry *rel;
-
- /* Nothing to do if remote attribute number is not set */
- if (errarg->remote_attnum < 0)
- return;
-
- rel = errarg->rel;
- errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
- rel->remoterel.nspname, rel->remoterel.relname,
- rel->remoterel.attnames[errarg->remote_attnum]);
-}
-
/*
* Store tuple data into slot.
*
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
ExecClearTuple(slot);
- /* Push callback + info on the error context stack */
- errarg.rel = rel;
- errarg.remote_attnum = -1;
- errcallback.callback = slot_store_error_callback;
- errcallback.arg = (void *) &errarg;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
/* Call the "in" function for each non-dropped, non-null attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
Assert(remoteattnum < tupleData->ncols);
- errarg.remote_attnum = remoteattnum;
+ /* Set attnum for error callback */
+ apply_error_callback_arg.remote_attnum = remoteattnum;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{
slot->tts_isnull[i] = true;
}
- errarg.remote_attnum = -1;
+ /* Reset attnum for error callback */
+ apply_error_callback_arg.remote_attnum = -1;
}
else
{
}
}
- /* Pop the error context stack */
- error_context_stack = errcallback.previous;
-
ExecStoreVirtualTuple(slot);
}
{
int natts = slot->tts_tupleDescriptor->natts;
int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
/* We'll fill "slot" with a virtual tuple, so we must start with ... */
ExecClearTuple(slot);
memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
- /* For error reporting, push callback + info on the error context stack */
- errarg.rel = rel;
- errarg.remote_attnum = -1;
- errcallback.callback = slot_store_error_callback;
- errcallback.arg = (void *) &errarg;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
/* Call the "in" function for each replaced attribute */
Assert(natts == rel->attrmap->maplen);
for (i = 0; i < natts; i++)
{
StringInfo colvalue = &tupleData->colvalues[remoteattnum];
- errarg.remote_attnum = remoteattnum;
+ /* Set attnum for error callback */
+ apply_error_callback_arg.remote_attnum = remoteattnum;
if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
{
slot->tts_isnull[i] = true;
}
- errarg.remote_attnum = -1;
+ /* Reset attnum for error callback */
+ apply_error_callback_arg.remote_attnum = -1;
}
}
- /* Pop the error context stack */
- error_context_stack = errcallback.previous;
-
/* And finally, declare that "slot" contains a valid virtual tuple */
ExecStoreVirtualTuple(slot);
}
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
+ set_apply_error_context_xact(begin_data.xid, begin_data.committime);
remote_final_lsn = begin_data.final_lsn;
process_syncing_tables(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
}
/*
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
remote_final_lsn = begin_data.prepare_lsn;
process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
}
/*
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
}
/*
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
process_syncing_tables(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
}
/*
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
process_syncing_tables(prepare_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+
+ reset_apply_error_context_info();
}
/*
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
+ set_apply_error_context_xact(stream_xid, 0);
+
/*
* Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent
MemoryContextReset(LogicalStreamingContext);
pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
}
/*
* just delete the files with serialized info.
*/
if (xid == subxid)
+ {
+ set_apply_error_context_xact(xid, 0);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ }
else
{
/*
char path[MAXPGPATH];
StreamXidHash *ent;
+ set_apply_error_context_xact(subxid, 0);
+
subidx = -1;
begin_replication_step();
subxact_info_read(MyLogicalRepWorker->subid, xid);
cleanup_subxact_info();
end_replication_step();
CommitTransactionCommand();
+ reset_apply_error_context_info();
return;
}
end_replication_step();
CommitTransactionCommand();
}
+
+ reset_apply_error_context_info();
}
/*
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
+ set_apply_error_context_xact(xid, commit_data.committime);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
process_syncing_tables(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
+
+ reset_apply_error_context_info();
}
/*
return;
}
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
estate = edata->estate;
finish_edata(edata);
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
logicalrep_rel_close(rel, NoLock);
end_replication_step();
return;
}
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
/* Check if we can do the update. */
check_relation_updatable(rel);
finish_edata(edata);
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
logicalrep_rel_close(rel, NoLock);
end_replication_step();
return;
}
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
/* Check if we can do the delete. */
check_relation_updatable(rel);
finish_edata(edata);
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
logicalrep_rel_close(rel, NoLock);
end_replication_step();
apply_dispatch(StringInfo s)
{
LogicalRepMsgType action = pq_getmsgbyte(s);
+ LogicalRepMsgType saved_command;
+
+ /*
+ * Set the current command being applied. Since this function can be
+ * called recusively when applying spooled changes, save the current
+ * command.
+ */
+ saved_command = apply_error_callback_arg.command;
+ apply_error_callback_arg.command = action;
switch (action)
{
case LOGICAL_REP_MSG_BEGIN:
apply_handle_begin(s);
- return;
+ break;
case LOGICAL_REP_MSG_COMMIT:
apply_handle_commit(s);
- return;
+ break;
case LOGICAL_REP_MSG_INSERT:
apply_handle_insert(s);
- return;
+ break;
case LOGICAL_REP_MSG_UPDATE:
apply_handle_update(s);
- return;
+ break;
case LOGICAL_REP_MSG_DELETE:
apply_handle_delete(s);
- return;
+ break;
case LOGICAL_REP_MSG_TRUNCATE:
apply_handle_truncate(s);
- return;
+ break;
case LOGICAL_REP_MSG_RELATION:
apply_handle_relation(s);
- return;
+ break;
case LOGICAL_REP_MSG_TYPE:
apply_handle_type(s);
- return;
+ break;
case LOGICAL_REP_MSG_ORIGIN:
apply_handle_origin(s);
- return;
+ break;
case LOGICAL_REP_MSG_MESSAGE:
* Although, it could be used by other applications that use this
* output plugin.
*/
- return;
+ break;
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
- return;
+ break;
case LOGICAL_REP_MSG_STREAM_STOP:
apply_handle_stream_stop(s);
- return;
+ break;
case LOGICAL_REP_MSG_STREAM_ABORT:
apply_handle_stream_abort(s);
- return;
+ break;
case LOGICAL_REP_MSG_STREAM_COMMIT:
apply_handle_stream_commit(s);
- return;
+ break;
case LOGICAL_REP_MSG_BEGIN_PREPARE:
apply_handle_begin_prepare(s);
- return;
+ break;
case LOGICAL_REP_MSG_PREPARE:
apply_handle_prepare(s);
- return;
+ break;
case LOGICAL_REP_MSG_COMMIT_PREPARED:
apply_handle_commit_prepared(s);
- return;
+ break;
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
apply_handle_rollback_prepared(s);
- return;
+ break;
case LOGICAL_REP_MSG_STREAM_PREPARE:
apply_handle_stream_prepare(s);
- return;
+ break;
+
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid logical replication message type \"%c\"", action)));
}
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("invalid logical replication message type \"%c\"",
- action)));
+ /* Reset the current command */
+ apply_error_callback_arg.command = saved_command;
}
/*
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
TimeLineID tli;
+ ErrorContextCallback errcallback;
/*
* Init the ApplyMessageContext which we clean up after each replication
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
+ /*
+ * Push apply error context callback. Fields will be filled during
+ * applying a change.
+ */
+ errcallback.callback = apply_error_callback;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
/* This outer loop iterates once per wait. */
for (;;)
{
}
}
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
/* All done */
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
}
{
return MyLogicalRepWorker != NULL;
}
+
+/* Error callback to give more context info about the change being applied */
+static void
+apply_error_callback(void *arg)
+{
+ StringInfoData buf;
+ ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+
+ if (apply_error_callback_arg.command == 0)
+ return;
+
+ initStringInfo(&buf);
+ appendStringInfo(&buf, _("processing remote data during \"%s\""),
+ logicalrep_message_type(errarg->command));
+
+ /* append relation information */
+ if (errarg->rel)
+ {
+ appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname);
+ if (errarg->remote_attnum >= 0)
+ appendStringInfo(&buf, _(" column \"%s\""),
+ errarg->rel->remoterel.attnames[errarg->remote_attnum]);
+ }
+
+ /* append transaction information */
+ if (TransactionIdIsNormal(errarg->remote_xid))
+ {
+ appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
+ if (errarg->ts != 0)
+ appendStringInfo(&buf, _(" at %s"),
+ timestamptz_to_str(errarg->ts));
+ }
+
+ errcontext("%s", buf.data);
+ pfree(buf.data);
+}
+
+/* Set transaction information of apply error callback */
+static inline void
+set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+{
+ apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.ts = ts;
+}
+
+/* Reset all information of apply error callback */
+static inline void
+reset_apply_error_context_info(void)
+{
+ apply_error_callback_arg.command = 0;
+ apply_error_callback_arg.rel = NULL;
+ apply_error_callback_arg.remote_attnum = -1;
+ set_apply_error_context_xact(InvalidTransactionId, 0);
+}