Wake up a subscription's replication worker processes after DDL.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 6 Jan 2023 21:08:20 +0000 (16:08 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 6 Jan 2023 22:27:58 +0000 (17:27 -0500)
Waken related worker processes immediately at commit of a transaction
that has performed ALTER SUBSCRIPTION (including the RENAME and
OWNER variants).  This reduces the response time for such operations.
In the real world that might not be worth much, but it shaves several
seconds off the runtime for the subscription test suite.

In the case of PREPARE, we just throw away this notification state;
it doesn't seem worth the work to preserve it.  The workers will
still react after the eventual COMMIT PREPARED, but not as quickly.

Nathan Bossart

Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13

src/backend/access/transam/xact.c
src/backend/commands/alter.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/worker.c
src/include/replication/logicalworker.h

index 24221542e7fa01cb6d2dfe532808d0a537ad00a9..8daa7f7d446fee701f54ea0e26d1c4544b03dd1a 100644 (file)
@@ -47,6 +47,7 @@
 #include "pgstat.h"
 #include "replication/logical.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
    AtEOXact_PgStat(true, is_parallel_worker);
    AtEOXact_Snapshot(true, false);
    AtEOXact_ApplyLauncher(true);
+   AtEOXact_LogicalRepWorkers(true);
    pgstat_report_xact_timestamp(0);
 
    CurrentResourceOwner = NULL;
@@ -2647,6 +2649,9 @@ PrepareTransaction(void)
    AtEOXact_HashTables(true);
    /* don't call AtEOXact_PgStat here; we fixed pgstat state above */
    AtEOXact_Snapshot(true, true);
+   /* we treat PREPARE as ROLLBACK so far as waking workers goes */
+   AtEOXact_ApplyLauncher(false);
+   AtEOXact_LogicalRepWorkers(false);
    pgstat_report_xact_timestamp(0);
 
    CurrentResourceOwner = NULL;
@@ -2860,6 +2865,7 @@ AbortTransaction(void)
        AtEOXact_HashTables(false);
        AtEOXact_PgStat(false, is_parallel_worker);
        AtEOXact_ApplyLauncher(false);
+       AtEOXact_LogicalRepWorkers(false);
        pgstat_report_xact_timestamp(0);
    }
 
index 70d359eb6a757a183c06eecade3179555621dc12..bea51b3af1f2f3b9b03ef7345b4b1938954aea6b 100644 (file)
@@ -59,6 +59,7 @@
 #include "commands/user.h"
 #include "miscadmin.h"
 #include "parser/parse_func.h"
+#include "replication/logicalworker.h"
 #include "rewrite/rewriteDefine.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -279,6 +280,9 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
        if (strncmp(new_name, "regress_", 8) != 0)
            elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
 #endif
+
+       /* Wake up related replication workers to handle this change quickly */
+       LogicalRepWorkersWakeupAtCommit(objectId);
    }
    else if (nameCacheId >= 0)
    {
index b9c5df796fcac71f0dd854d3ba4b106c091849b0..f15a332bae375e028ed0990d3d0d45a019f9d5bf 100644 (file)
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
 #include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
    InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
+   /* Wake up related replication workers to handle this change quickly. */
+   LogicalRepWorkersWakeupAtCommit(subid);
+
    return myself;
 }
 
@@ -1732,7 +1736,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
    InvokeObjectPostAlterHook(SubscriptionRelationId,
                              form->oid, 0);
 
+   /* Wake up related background processes to handle this change quickly. */
    ApplyLauncherWakeupAtCommit();
+   LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
 /*
index f8e8cf71eb86dd847a4e12c2e9da1f26f8bd3ead..f8649e142c35aca4c7d84f56663eac0a579dbebe 100644 (file)
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 Subscription *MySubscription = NULL;
 static bool MySubscriptionValid = false;
 
+static List *on_commit_wakeup_workers_subids = NIL;
+
 bool       in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
@@ -4092,3 +4094,53 @@ reset_apply_error_context_info(void)
    apply_error_callback_arg.remote_attnum = -1;
    set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
 }
+
+/*
+ * Request wakeup of the workers for the given subscription OID
+ * at commit of the current transaction.
+ *
+ * This is used to ensure that the workers process assorted changes
+ * as soon as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+   MemoryContext oldcxt;
+
+   oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+   on_commit_wakeup_workers_subids =
+       list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
+   MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Wake up the workers of any subscriptions that were changed in this xact.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+   if (isCommit && on_commit_wakeup_workers_subids != NIL)
+   {
+       ListCell   *lc;
+
+       LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+       foreach(lc, on_commit_wakeup_workers_subids)
+       {
+           Oid         subid = lfirst_oid(lc);
+           List       *workers;
+           ListCell   *lc2;
+
+           workers = logicalrep_workers_find(subid, true);
+           foreach(lc2, workers)
+           {
+               LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
+
+               logicalrep_worker_wakeup_ptr(worker);
+           }
+       }
+       LWLockRelease(LogicalRepWorkerLock);
+   }
+
+   /* The List storage will be reclaimed automatically in xact cleanup. */
+   on_commit_wakeup_workers_subids = NIL;
+}
index f1e7e8a34844e0ad67c0a7dce1c8ce35914e625b..e484662b7232eb8a673283d6d33b444ae7df558c 100644 (file)
@@ -16,4 +16,8 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
 #endif                         /* LOGICALWORKER_H */