END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* If we still haven't flushed to the request point then we have a
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* Great, done. To take some work off the critical path, try to initialize
* If there were cascading standby servers connected to us, nudge any wal
* sender processes to notice that we've been promoted.
*/
- WalSndWakeup();
+ WalSndWakeup(true, true);
/*
* If this was a promotion, request an (online) checkpoint now. This isn't
* if we restored something other than a WAL segment, but it does no harm
* either.
*/
- WalSndWakeup();
+ WalSndWakeup(true, false);
}
/*
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ /* ------
+ * Wakeup walsenders:
+ *
+ * On the standby, the WAL is flushed first (which will only wake up
+ * physical walsenders) and then applied, which will only wake up logical
+ * walsenders.
+ *
+ * Indeed, logical walsenders on standby can't decode and send data until
+ * it's been applied.
+ *
+ * Physical walsenders don't need to be woken up during replay unless
+ * cascading replication is allowed and time line change occurred (so that
+ * they can notice that they are on a new time line).
+ *
+ * That's why the wake up conditions are for:
+ *
+ * - physical walsenders in case of new time line and cascade
+ * replication is allowed
+ * - logical walsenders in case cascade replication is allowed (could not
+ * be created otherwise)
+ * ------
+ */
+ if (AllowCascadeReplication())
+ WalSndWakeup(switchedTLI, true);
+
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
*/
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
- /*
- * Wake up any walsenders to notice that we are on a new timeline.
- */
- if (AllowCascadeReplication())
- WalSndWakeup();
-
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
{
/*
* When we find that WAL ends in an incomplete record, keep track
- * of that record. After recovery is done, we'll write a record to
- * indicate to downstream WAL readers that that portion is to be
- * ignored.
+ * of that record. After recovery is done, we'll write a record
+ * to indicate to downstream WAL readers that that portion is to
+ * be ignored.
*
* However, when ArchiveRecoveryRequested = true, we're going to
* switch to a new timeline at the end of recovery. We will only
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
- WalSndWakeup();
+ WalSndWakeup(true, false);
/* Report XLOG streaming progress in PS display */
if (update_process_title)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+
+ /*
+ * The kind assignment is done here and not in StartReplication()
+ * and StartLogicalReplication(). Indeed, the logical walsender
+ * needs to read WAL records (like snapshot of running
+ * transactions) during the slot creation. So it needs to be woken
+ * up based on its kind.
+ *
+ * The kind assignment could also be done in StartReplication(),
+ * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+ * seems better to set it on one place.
+ */
+ if (MyDatabaseId == InvalidOid)
+ walsnd->kind = REPLICATION_KIND_PHYSICAL;
+ else
+ walsnd->kind = REPLICATION_KIND_LOGICAL;
+
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
}
/*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
+ ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes).
+ * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+ * get kind.
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
+ kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
- if (latch != NULL)
+ if (latch == NULL)
+ continue;
+
+ if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+ (logical && kind == REPLICATION_KIND_LOGICAL))
SetLatch(latch);
}
}
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
/*
* wakeup walsenders if there is work to be done
*/
-#define WalSndWakeupProcessRequests() \
- do \
- { \
- if (wake_wal_senders) \
- { \
- wake_wal_senders = false; \
- if (max_wal_senders > 0) \
- WalSndWakeup(); \
- } \
- } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+ if (wake_wal_senders)
+ {
+ wake_wal_senders = false;
+ if (max_wal_senders > 0)
+ WalSndWakeup(physical, logical);
+ }
+}
#endif /* _WALSENDER_H */
#include "access/xlog.h"
#include "lib/ilist.h"
#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
#include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
+
+ ReplicationKind kind;
} WalSnd;
extern PGDLLIMPORT WalSnd *MyWalSnd;