Put the logic to decide which synchronous standby is active into a function.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 12 Dec 2014 11:39:36 +0000 (13:39 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 12 Dec 2014 12:26:42 +0000 (14:26 +0200)
This avoids duplicating the code.

Michael Paquier, reviewed by Simon Riggs and me

src/backend/replication/syncrep.c
src/backend/replication/walsender.c
src/include/replication/syncrep.h

index aa54bfba6cf87f4a09e6f15b777ecab4c12a8735..a2b17f151639fec5d7f8c3a67826ed89720dffd9 100644 (file)
@@ -5,7 +5,7 @@
  * Synchronous replication is new as of PostgreSQL 9.1.
  *
  * If requested, transaction commits wait until their commit LSN is
- * acknowledged by the sync standby.
+ * acknowledged by the synchronous standby.
  *
  * This module contains the code for waiting and release of backends.
  * All code in this module executes on the primary. The core streaming
@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
    }
 }
 
+/*
+ * Find the WAL sender servicing the synchronous standby with the lowest
+ * priority value, or NULL if no synchronous standby is connected. If there
+ * are multiple standbys with the same lowest priority value, the first one
+ * found is selected. The caller must hold SyncRepLock.
+ */
+WalSnd *
+SyncRepGetSynchronousStandby(void)
+{
+   WalSnd     *result = NULL;
+   int         result_priority = 0;
+   int         i;
+
+   for (i = 0; i < max_wal_senders; i++)
+   {
+       /* Use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+       int         this_priority;
+
+       /* Must be active */
+       if (walsnd->pid == 0)
+           continue;
+
+       /* Must be streaming */
+       if (walsnd->state != WALSNDSTATE_STREAMING)
+           continue;
+
+       /* Must be synchronous */
+       this_priority = walsnd->sync_standby_priority;
+       if (this_priority == 0)
+           continue;
+
+       /* Must have a lower priority value than any previous ones */
+       if (result != NULL && result_priority <= this_priority)
+           continue;
+
+       /* Must have a valid flush position */
+       if (XLogRecPtrIsInvalid(walsnd->flush))
+           continue;
+
+       result = (WalSnd *) walsnd;
+       result_priority = this_priority;
+
+       /*
+        * If priority is equal to 1, there cannot be any other WAL senders
+        * with a lower priority, so we're done.
+        */
+       if (this_priority == 1)
+           return result;
+   }
+
+   return result;
+}
+
 /*
  * Update the LSNs on each queue based upon our latest state. This
  * implements a simple policy of first-valid-standby-releases-waiter.
@@ -368,11 +422,9 @@ void
 SyncRepReleaseWaiters(void)
 {
    volatile WalSndCtlData *walsndctl = WalSndCtl;
-   volatile WalSnd *syncWalSnd = NULL;
+   WalSnd     *syncWalSnd;
    int         numwrite = 0;
    int         numflush = 0;
-   int         priority = 0;
-   int         i;
 
    /*
     * If this WALSender is serving a standby that is not on the list of
@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
 
    /*
     * We're a potential sync standby. Release waiters if we are the highest
-    * priority standby. If there are multiple standbys with same priorities
-    * then we use the first mentioned standby. If you change this, also
-    * change pg_stat_get_wal_senders().
+    * priority standby.
     */
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+   syncWalSnd = SyncRepGetSynchronousStandby();
 
-   for (i = 0; i < max_wal_senders; i++)
-   {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalSnd *walsnd = &walsndctl->walsnds[i];
-
-       if (walsnd->pid != 0 &&
-           walsnd->state == WALSNDSTATE_STREAMING &&
-           walsnd->sync_standby_priority > 0 &&
-           (priority == 0 ||
-            priority > walsnd->sync_standby_priority) &&
-           !XLogRecPtrIsInvalid(walsnd->flush))
-       {
-           priority = walsnd->sync_standby_priority;
-           syncWalSnd = walsnd;
-       }
-   }
-
-   /*
-    * We should have found ourselves at least.
-    */
-   Assert(syncWalSnd);
+   /* We should have found ourselves at least */
+   Assert(syncWalSnd != NULL);
 
    /*
     * If we aren't managing the highest priority standby then just leave.
index addae8f6ce512e6864548d86ec91a1daaae58812..5937cbbb026b8443f72687b30d14ee589b068f6c 100644 (file)
@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    Tuplestorestate *tupstore;
    MemoryContext per_query_ctx;
    MemoryContext oldcontext;
-   int        *sync_priority;
-   int         priority = 0;
-   int         sync_standby = -1;
+   WalSnd     *sync_standby;
    int         i;
 
    /* check to see if caller supports us returning a tuplestore */
@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    MemoryContextSwitchTo(oldcontext);
 
    /*
-    * Get the priorities of sync standbys all in one go, to minimise lock
-    * acquisitions and to allow us to evaluate who is the current sync
-    * standby. This code must match the code in SyncRepReleaseWaiters().
+    * Get the currently active synchronous standby.
     */
-   sync_priority = palloc(sizeof(int) * max_wal_senders);
    LWLockAcquire(SyncRepLock, LW_SHARED);
-   for (i = 0; i < max_wal_senders; i++)
-   {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
-       if (walsnd->pid != 0)
-       {
-           /*
-            * Treat a standby such as a pg_basebackup background process
-            * which always returns an invalid flush location, as an
-            * asynchronous standby.
-            */
-           sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
-               0 : walsnd->sync_standby_priority;
-
-           if (walsnd->state == WALSNDSTATE_STREAMING &&
-               walsnd->sync_standby_priority > 0 &&
-               (priority == 0 ||
-                priority > walsnd->sync_standby_priority) &&
-               !XLogRecPtrIsInvalid(walsnd->flush))
-           {
-               priority = walsnd->sync_standby_priority;
-               sync_standby = i;
-           }
-       }
-   }
+   sync_standby = SyncRepGetSynchronousStandby();
    LWLockRelease(SyncRepLock);
 
    for (i = 0; i < max_wal_senders; i++)
@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        XLogRecPtr  write;
        XLogRecPtr  flush;
        XLogRecPtr  apply;
+       int         priority;
        WalSndState state;
        Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
        bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        write = walsnd->write;
        flush = walsnd->flush;
        apply = walsnd->apply;
+       priority = walsnd->sync_standby_priority;
        SpinLockRelease(&walsnd->mutex);
 
        memset(nulls, 0, sizeof(nulls));
@@ -2857,15 +2829,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
                nulls[5] = true;
            values[5] = LSNGetDatum(apply);
 
-           values[6] = Int32GetDatum(sync_priority[i]);
+           /*
+            * Treat a standby such as a pg_basebackup background process
+            * which always returns an invalid flush location, as an
+            * asynchronous standby.
+            */
+           priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
+
+           values[6] = Int32GetDatum(priority);
 
            /*
             * More easily understood version of standby state. This is purely
             * informational, not different from priority.
             */
-           if (sync_priority[i] == 0)
+           if (priority == 0)
                values[7] = CStringGetTextDatum("async");
-           else if (i == sync_standby)
+           else if (walsnd == sync_standby)
                values[7] = CStringGetTextDatum("sync");
            else
                values[7] = CStringGetTextDatum("potential");
@@ -2873,7 +2852,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
-   pfree(sync_priority);
 
    /* clean up and return the tuplestore */
    tuplestore_donestoring(tupstore);
index 7eeaf3b04c53a9a79b30f3f6c3d6bc1177fb5ef8..6f78fee47337d08bedb30cc97f31b146a79375d6 100644 (file)
@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
 /* called by various procs */
 extern int SyncRepWakeQueue(bool all, int mode);
 
+/* forward declaration to avoid pulling in walsender_private.h */
+struct WalSnd;
+extern struct WalSnd *SyncRepGetSynchronousStandby(void);
+
 extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
 extern void assign_synchronous_commit(int newval, void *extra);