Only kill sync workers at commit time in subscription DDL
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 5 Aug 2017 01:14:35 +0000 (21:14 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 5 Aug 2017 01:17:47 +0000 (21:17 -0400)
This allows a transaction abort to avoid killing those workers.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>

src/backend/access/transam/xact.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/include/replication/logicallauncher.h
src/include/replication/worker_internal.h

index b0aa69fe4b43de546f0b4dedae97c313f2f4812f..50c3c3b5e5e3763d795294cf545c1f1bdaa6f0ed 100644 (file)
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+   /*
+    * Don't allow PREPARE but for transaction that has/might kill logical
+    * replication workers.
+    */
+   if (XactManipulatesLogicalReplicationWorkers())
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
    /* Prevent cancel/die interrupt while cleaning up */
    HOLD_INTERRUPTS();
 
index 6dc3f6ee0009dedec1b7116b2fc167a849a8eb5c..87824b8fec3fe11e53012e4035dda6aaafe80b2d 100644 (file)
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
            RemoveSubscriptionRel(sub->oid, relid);
 
-           logicalrep_worker_stop(sub->oid, relid);
+           logicalrep_worker_stop_at_commit(sub->oid, relid);
 
            namespace = get_namespace_name(get_rel_namespace(relid));
            ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    char       *subname;
    char       *conninfo;
    char       *slotname;
+   List       *subworkers;
+   ListCell   *lc;
    char        originname[NAMEDATALEN];
    char       *err = NULL;
    RepOriginId originid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
    ReleaseSysCache(tup);
 
+   /*
+    * If we are dropping the replication slot, stop all the subscription
+    * workers immediately, so that the slot becomes accessible.  Otherwise
+    * just schedule the stopping for the end of the transaction.
+    *
+    * New workers won't be started because we hold an exclusive lock on the
+    * subscription till the end of the transaction.
+    */
+   LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+   subworkers = logicalrep_workers_find(subid, false);
+   LWLockRelease(LogicalRepWorkerLock);
+   foreach (lc, subworkers)
+   {
+       LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+       if (slotname)
+           logicalrep_worker_stop(w->subid, w->relid);
+       else
+           logicalrep_worker_stop_at_commit(w->subid, w->relid);
+   }
+   list_free(subworkers);
+
    /* Clean up dependencies */
    deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
    /* Remove any associated relation synchronization states. */
    RemoveSubscriptionRel(subid, InvalidOid);
 
-   /* Kill the apply worker so that the slot becomes accessible. */
-   logicalrep_worker_stop(subid, InvalidOid);
-
    /* Remove the origin tracking if exists. */
    snprintf(originname, sizeof(originname), "pg_%u", subid);
    originid = replorigin_by_name(originname, true);
index d165d518e1b35d9630683641b2f1e67f27e3b932..0f9e5755b9e747b67f739e21664f6dbf7cea718b 100644 (file)
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+   Oid subid;
+   Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
    return res;
 }
 
+/*
+ * Similar to logicalrep_worker_find(), but returns list of all workers for
+ * the subscription, instead just one.
+ */
+List *
+logicalrep_workers_find(Oid subid, bool only_running)
+{
+   int         i;
+   List       *res = NIL;
+
+   Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+   /* Search for attached worker for a given subscription id. */
+   for (i = 0; i < max_logical_replication_workers; i++)
+   {
+       LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+       if (w->in_use && w->subid == subid && (!only_running || w->proc))
+           res = lappend(res, w);
+   }
+
+   return res;
+}
+
 /*
  * Start new apply background worker.
  */
@@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
    LWLockRelease(LogicalRepWorkerLock);
 }
 
+/*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+   LogicalRepWorkerId *wid;
+   MemoryContext       oldctx;
+
+   /* Make sure we store the info in context that survives until commit. */
+   oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+   wid = palloc(sizeof(LogicalRepWorkerId));
+   wid->subid = subid;
+   wid->relid = relid;
+
+   on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+   MemoryContextSwitchTo(oldctx);
+}
+
 /*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
@@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void)
    }
 }
 
+/*
+ * Check whether current transaction has manipulated logical replication
+ * workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+   return (on_commit_stop_workers != NIL);
+}
+
 /*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-   if (isCommit && on_commit_launcher_wakeup)
-       ApplyLauncherWakeup();
+   if (isCommit)
+   {
+       ListCell *lc;
 
+       foreach (lc, on_commit_stop_workers)
+       {
+           LogicalRepWorkerId *wid = lfirst(lc);
+           logicalrep_worker_stop(wid->subid, wid->relid);
+       }
+
+       if (on_commit_launcher_wakeup)
+           ApplyLauncherWakeup();
+   }
+
+   /*
+    * No need to pfree on_commit_stop_workers.  It was allocated in
+    * transaction memory context, which is going to be cleaned soon.
+    */
+   on_commit_stop_workers = NIL;
    on_commit_launcher_wakeup = false;
 }
 
index aac7d326e22cf2d1cbe3dc9cca42f0384b7327b9..78016c448f34964507c23203bb57808010973edf 100644 (file)
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
index 494a3a3d087d2dea9cccb5b2bfd3a0fbc40e5905..7b8728cced0bb1b0e7b4f1ff2abbf1a0d5f91f98 100644 (file)
@@ -71,9 +71,11 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                       bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                         Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);