Fix signal handling in logical replication workers
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Jun 2017 18:46:00 +0000 (14:46 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 2 Jun 2017 18:49:23 +0000 (14:49 -0400)
The logical replication worker processes now use the normal die()
handler for SIGTERM and CHECK_FOR_INTERRUPTS() instead of custom code.
One problem before was that the apply worker would not exit promptly
when a subscription was dropped, which could lead to deadlocks.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/backend/tcop/postgres.c
src/include/replication/logicalworker.h
src/include/replication/worker_internal.h

index 63d903ac0217281816074803b17d03f83f071959..345a415212331ee2ea4f739df76ec23c75b7ca7a 100644 (file)
@@ -80,8 +80,8 @@ static void logicalrep_worker_detach(void);
 static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 
 /* Flags set by signal handlers */
-volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t got_SIGTERM = false;
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGTERM = false;
 
 static bool on_commit_launcher_wakeup = false;
 
@@ -624,8 +624,8 @@ logicalrep_worker_onexit(int code, Datum arg)
 }
 
 /* SIGTERM: set flag to exit at next convenient time */
-void
-logicalrep_worker_sigterm(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sigterm(SIGNAL_ARGS)
 {
    int         save_errno = errno;
 
@@ -638,8 +638,8 @@ logicalrep_worker_sigterm(SIGNAL_ARGS)
 }
 
 /* SIGHUP: set flag to reload configuration at next convenient time */
-void
-logicalrep_worker_sighup(SIGNAL_ARGS)
+static void
+logicalrep_launcher_sighup(SIGNAL_ARGS)
 {
    int         save_errno = errno;
 
@@ -799,8 +799,8 @@ ApplyLauncherMain(Datum main_arg)
    before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
 
    /* Establish signal handlers. */
-   pqsignal(SIGHUP, logicalrep_worker_sighup);
-   pqsignal(SIGTERM, logicalrep_worker_sigterm);
+   pqsignal(SIGHUP, logicalrep_launcher_sighup);
+   pqsignal(SIGTERM, logicalrep_launcher_sigterm);
    BackgroundWorkerUnblockSignals();
 
    /* Make it easy to identify our processes. */
index 515724e1026f05f9097f7d60f7788f077d4dafca..85e480db4bdd6c95eceafac6f42a9ce0b8112d19 100644 (file)
@@ -154,10 +154,12 @@ wait_for_sync_status_change(Oid relid, char origstate)
    int         rc;
    char        state = origstate;
 
-   while (!got_SIGTERM)
+   for (;;)
    {
        LogicalRepWorker *worker;
 
+       CHECK_FOR_INTERRUPTS();
+
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
        worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                        relid, false);
@@ -525,7 +527,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
        bytesread += avail;
    }
 
-   while (!got_SIGTERM && maxread > 0 && bytesread < minread)
+   while (maxread > 0 && bytesread < minread)
    {
        pgsocket    fd = PGINVALID_SOCKET;
        int         rc;
@@ -579,10 +581,6 @@ copy_read_data(void *outbuf, int minread, int maxread)
        ResetLatch(&MyProc->procLatch);
    }
 
-   /* Check for exit condition. */
-   if (got_SIGTERM)
-       proc_exit(0);
-
    return bytesread;
 }
 
index ea3ba1d5b4858bb5ca8827eb2f360d1b1792e319..e31551340c9266f601d4ad934c15c14c30737ebf 100644 (file)
@@ -72,6 +72,8 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 
+#include "tcop/tcopprot.h"
+
 #include "utils/builtins.h"
 #include "utils/catcache.h"
 #include "utils/datum.h"
@@ -118,6 +120,9 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void reread_subscription(void);
 
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -1005,7 +1010,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
    /* mark as idle, before starting to loop */
    pgstat_report_activity(STATE_IDLE, NULL);
 
-   while (!got_SIGTERM)
+   for (;;)
    {
        pgsocket    fd = PGINVALID_SOCKET;
        int         rc;
@@ -1015,6 +1020,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        TimestampTz last_recv_timestamp = GetCurrentTimestamp();
        bool        ping_sent = false;
 
+       CHECK_FOR_INTERRUPTS();
+
        MemoryContextSwitchTo(ApplyMessageContext);
 
        len = walrcv_receive(wrconn, &buf, &fd);
@@ -1437,6 +1444,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    MySubscriptionValid = false;
 }
 
+/* SIGHUP: set flag to reload configuration at next convenient time */
+static void
+logicalrep_worker_sighup(SIGNAL_ARGS)
+{
+   int         save_errno = errno;
+
+   got_SIGHUP = true;
+
+   /* Waken anything waiting on the process latch */
+   SetLatch(MyLatch);
+
+   errno = save_errno;
+}
 
 /* Logical Replication Apply worker entry point */
 void
@@ -1454,7 +1474,7 @@ ApplyWorkerMain(Datum main_arg)
 
    /* Setup signal handling */
    pqsignal(SIGHUP, logicalrep_worker_sighup);
-   pqsignal(SIGTERM, logicalrep_worker_sigterm);
+   pqsignal(SIGTERM, die);
    BackgroundWorkerUnblockSignals();
 
    /* Initialise stats to a sanish value */
@@ -1604,6 +1624,14 @@ ApplyWorkerMain(Datum main_arg)
    /* Run the main loop. */
    LogicalRepApplyLoop(origin_startpos);
 
-   /* We should only get here if we received SIGTERM */
    proc_exit(0);
 }
+
+/*
+ * Is current process a logical replication worker?
+ */
+bool
+IsLogicalWorker(void)
+{
+   return MyLogicalRepWorker != NULL;
+}
index 75c2d9a61d0dc067e9844f986cf23e23f724e565..13577691505b3654132ddd7177685f008cc0805b 100644 (file)
@@ -55,6 +55,7 @@
 #include "pg_getopt.h"
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "rewrite/rewriteHandler.h"
@@ -2845,6 +2846,10 @@ ProcessInterrupts(void)
            ereport(FATAL,
                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
                     errmsg("terminating autovacuum process due to administrator command")));
+       else if (IsLogicalWorker())
+           ereport(FATAL,
+                   (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                    errmsg("terminating logical replication worker due to administrator command")));
        else if (RecoveryConflictPending && RecoveryConflictRetryable)
        {
            pgstat_report_recovery_conflict(RecoveryConflictReason);
index 3e0affa190b728b01adf0e90fe4ec062fd37c861..5877a930f68ca635c68c25fa031a28e09b824b80 100644 (file)
@@ -14,4 +14,6 @@
 
 extern void ApplyWorkerMain(Datum main_arg);
 
+extern bool IsLogicalWorker(void);
+
 #endif   /* LOGICALWORKER_H */
index 0654461305b33cb420f62d871b71eb88e2394254..2bfff5c1205dc499f845f2b725db56473773ab51 100644 (file)
@@ -67,8 +67,6 @@ extern Subscription *MySubscription;
 extern LogicalRepWorker *MyLogicalRepWorker;
 
 extern bool in_remote_transaction;
-extern volatile sig_atomic_t got_SIGHUP;
-extern volatile sig_atomic_t got_SIGTERM;
 
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
@@ -81,8 +79,6 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int logicalrep_sync_worker_count(Oid subid);
 
-extern void logicalrep_worker_sighup(SIGNAL_ARGS);
-extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void       process_syncing_tables(XLogRecPtr current_lsn);
 void invalidate_syncing_table_states(Datum arg, int cacheid,