Make tablesync worker exit when apply dies while it was waiting for it
authorPeter Eisentraut <peter_e@gmx.net>
Sat, 3 Jun 2017 13:18:52 +0000 (09:18 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Sat, 3 Jun 2017 13:20:56 +0000 (09:20 -0400)
This avoids "orphaned" sync workers.

This was caused by a thinko in wait_for_sync_status_change.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/replication/logical/tablesync.c

index 85e480db4bdd6c95eceafac6f42a9ce0b8112d19..6e268f3521dba4fec25976c09030a183e04a9b37 100644 (file)
@@ -146,7 +146,12 @@ finish_sync_worker(void)
 /*
  * Wait until the table synchronization change.
  *
- * Returns false if the relation subscription state disappeared.
+ * If called from apply worker, it will wait for the synchronization worker to
+ * change table state in shmem.  If called from synchronization worker, it
+ * will wait for apply worker to change table state in shmem.
+ *
+ * Returns false if the opposite worker has disappeared or the table state has
+ * been reset.
  */
 static bool
 wait_for_sync_status_change(Oid relid, char origstate)
@@ -161,14 +166,27 @@ wait_for_sync_status_change(Oid relid, char origstate)
        CHECK_FOR_INTERRUPTS();
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+       /* Check if the opposite worker is still running and bail if not. */
        worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                       relid, false);
+                                       am_tablesync_worker() ? InvalidOid : relid,
+                                       false);
        if (!worker)
        {
            LWLockRelease(LogicalRepWorkerLock);
            return false;
        }
+
+       /*
+        * If I'm the synchronization worker, look at my own state.  Otherwise
+        * look at the state of the synchronization worker we found above.
+        */
+       if (am_tablesync_worker())
+           worker = MyLogicalRepWorker;
+
+       Assert(worker->relid == relid);
        state = worker->relstate;
+
        LWLockRelease(LogicalRepWorkerLock);
 
        if (state == SUBREL_STATE_UNKNOWN)
@@ -179,7 +197,7 @@ wait_for_sync_status_change(Oid relid, char origstate)
 
        rc = WaitLatch(&MyProc->procLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-                      10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+                      1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
 
        /* emergency bailout if postmaster has died */
        if (rc & WL_POSTMASTER_DEATH)