diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/worker.c | 21 | ||||
-rw-r--r-- | src/test/subscription/t/100_bugs.pl | 77 |
2 files changed, 87 insertions, 11 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 65e22306c48..db09978697f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -416,6 +416,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void replorigin_reset(int code, Datum arg); + /* * Form the origin name for the subscription. * @@ -4441,6 +4443,14 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_reset(0, (Datum) 0); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -4928,23 +4938,12 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; - int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); - elevel = geterrlevel(); - - /* - * Reset the origin state to prevent the advancement of origin progress if - * we fail to apply. Otherwise, this will result in transaction loss as - * that transaction won't be sent again by the server. - */ - if (elevel >= ERROR) - replorigin_reset(0, (Datum) 0); - if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index cb36ca7b16b..3498a90e741 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -490,4 +490,81 @@ is( $result, qq(2|f $node_publisher->stop('fast'); $node_subscriber->stop('fast'); +# The bug was that when an ERROR was caught and handled by a (PL/pgSQL) +# function, the apply worker reset the replication origin but continued +# processing subsequent changes. So, we fail to update the replication origin +# during further apply operations. This can lead to the apply worker requesting +# the changes that have been applied again after restarting. + +$node_publisher->rotate_logfile(); +$node_publisher->start(); + +$node_subscriber->rotate_logfile(); +$node_subscriber->start(); + +# Set up a publication with a table +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE PUBLICATION regress_pub FOR TABLE t1; +)); + +# Set up a subscription which subscribes the publication +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub; +)); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub'); + +# Create an AFTER INSERT trigger on the table that raises and subsequently +# handles an exception. Subsequent insertions will trigger this exception, +# causing the apply worker to invoke its error callback with an ERROR. However, +# since the error is caught within the trigger, the apply worker will continue +# processing changes. +$node_subscriber->safe_psql( + 'postgres', q{ +CREATE FUNCTION handle_exception_trigger() +RETURNS TRIGGER AS $$ +BEGIN + BEGIN + -- Raise an exception + RAISE EXCEPTION 'This is a test exception'; + EXCEPTION + WHEN OTHERS THEN + RETURN NEW; + END; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER silent_exception_trigger +AFTER INSERT OR UPDATE ON t1 +FOR EACH ROW +EXECUTE FUNCTION handle_exception_trigger(); + +ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger; +}); + +# Obtain current remote_lsn value to check its advancement later +my $remote_lsn = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); + +# Insert a tuple to replicate changes +$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Confirms the origin can be advanced +$result = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); +is($result, 't', + 'remote_lsn has advanced for apply worker raising an exception'); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + done_testing(); |