Lag tracking for logical replication
authorSimon Riggs <simon@2ndQuadrant.com>
Fri, 12 May 2017 09:50:56 +0000 (10:50 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Fri, 12 May 2017 09:50:56 +0000 (10:50 +0100)
Lag tracking is called for each commit, but we introduce
a pacing delay to ensure we don't swamp the lag tracker.

Author: Petr Jelinek, with minor pacing delay code from me

src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/include/replication/logical.h
src/include/replication/output_plugin.h

index ab963c53456a7fe716bfa3da9e93f7ce47476494..7409e5ce3de759d2acead457824c4f1af49330ea 100644 (file)
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
                                           bool need_full_snapshot,
                                           XLogPageReadCB read_page,
                                           LogicalOutputPluginWriterPrepareWrite prepare_write,
-                                          LogicalOutputPluginWriterWrite do_write)
+                                          LogicalOutputPluginWriterWrite do_write,
+                                          LogicalOutputPluginWriterUpdateProgress update_progress)
 {
        ReplicationSlot *slot;
        MemoryContext context,
@@ -186,6 +187,7 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->out = makeStringInfo();
        ctx->prepare_write = prepare_write;
        ctx->write = do_write;
+       ctx->update_progress = update_progress;
 
        ctx->output_plugin_options = output_plugin_options;
 
@@ -199,8 +201,9 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin contains the name of the output plugin
  * output_plugin_options contains options passed to the output plugin
- * read_page, prepare_write, do_write are callbacks that have to be filled to
- *             perform the use-case dependent, actual, work.
+ * read_page, prepare_write, do_write, update_progress
+ *     callbacks that have to be filled to perform the use-case dependent,
+ *     actual, work.
  *
  * Needs to be called while in a memory context that's at least as long lived
  * as the decoding context because further memory contexts will be created
@@ -215,7 +218,8 @@ CreateInitDecodingContext(char *plugin,
                                                  bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
-                                                 LogicalOutputPluginWriterWrite do_write)
+                                                 LogicalOutputPluginWriterWrite do_write,
+                                                 LogicalOutputPluginWriterUpdateProgress update_progress)
 {
        TransactionId xmin_horizon = InvalidTransactionId;
        ReplicationSlot *slot;
@@ -300,7 +304,7 @@ CreateInitDecodingContext(char *plugin,
 
        ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
                                                                 need_full_snapshot, read_page, prepare_write,
-                                                                do_write);
+                                                                do_write, update_progress);
 
        /* call output plugin initialization callback */
        old_context = MemoryContextSwitchTo(ctx->context);
@@ -324,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
  * output_plugin_options
  *             contains options passed to the output plugin.
  *
- * read_page, prepare_write, do_write
+ * read_page, prepare_write, do_write, update_progress
  *             callbacks that have to be filled to perform the use-case dependent,
  *             actual work.
  *
@@ -340,7 +344,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                                          List *output_plugin_options,
                                          XLogPageReadCB read_page,
                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
-                                         LogicalOutputPluginWriterWrite do_write)
+                                         LogicalOutputPluginWriterWrite do_write,
+                                         LogicalOutputPluginWriterUpdateProgress update_progress)
 {
        LogicalDecodingContext *ctx;
        ReplicationSlot *slot;
@@ -390,7 +395,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 
        ctx = StartupDecodingContext(output_plugin_options,
                                                                 start_lsn, InvalidTransactionId, false,
-                                                                read_page, prepare_write, do_write);
+                                                                read_page, prepare_write, do_write,
+                                                                update_progress);
 
        /* call output plugin initialization callback */
        old_context = MemoryContextSwitchTo(ctx->context);
@@ -503,6 +509,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
        ctx->prepared_write = false;
 }
 
+/*
+ * Update progress tracking (if supported).
+ */
+void
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+{
+       if (!ctx->update_progress)
+               return;
+
+       ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+}
+
 /*
  * Load the output plugin, lookup its output plugin init function, and check
  * that it provides the required callbacks.
index c251b92f57bcbc248a9afde17b47328a03c55978..27164de093dd8de2874cb16edf7b4ca9dae75c20 100644 (file)
@@ -253,7 +253,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                                                                        options,
                                                                        logical_read_local_xlog_page,
                                                                        LogicalOutputPrepareWrite,
-                                                                       LogicalOutputWrite);
+                                                                       LogicalOutputWrite, NULL);
 
                MemoryContextSwitchTo(oldcontext);
 
index f3eaccffd5b8b8be0bcad857b6a93b2befa9a145..4ddfbf7a98b14e7d7a7a081dc5129cc6c863828a 100644 (file)
@@ -244,6 +244,8 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                         XLogRecPtr commit_lsn)
 {
+       OutputPluginUpdateProgress(ctx);
+
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_commit(ctx->out, txn, commit_lsn);
        OutputPluginWrite(ctx, true);
index 6ee1e68819afb0c578020ada73081fb6276f6a00..56a9ca965172e17df5600780201759f9585da50f 100644 (file)
@@ -133,7 +133,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
         */
        ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
                                                                        false, /* do not build snapshot */
-                                                                       logical_read_local_xlog_page, NULL, NULL);
+                                                                       logical_read_local_xlog_page, NULL, NULL,
+                                                                       NULL);
 
        /* build initial snapshot, might take a while */
        DecodingContextFindStartpoint(ctx);
index 45d027803ab997494828d7f99490ca12ea620ad4..e4e5337d549b9066924aabdf2dc0c1e3211fbaa7 100644 (file)
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
 static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
                ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
                                                                                logical_read_xlog_page,
-                                                                               WalSndPrepareWrite, WalSndWriteData);
+                                                                               WalSndPrepareWrite, WalSndWriteData,
+                                                                               WalSndUpdateProgress);
 
                /*
                 * Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
         * Initialize position to the last ack'ed one, then the xlog records begin
         * to be shipped from that position.
         */
-       logical_decoding_ctx = CreateDecodingContext(
-                                                                                          cmd->startpoint, cmd->options,
+       logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
                                                                                                 logical_read_xlog_page,
-                                                                               WalSndPrepareWrite, WalSndWriteData);
+                                                                                                WalSndPrepareWrite,
+                                                                                                WalSndWriteData,
+                                                                                                WalSndUpdateProgress);
 
        /* Start reading WAL from the oldest required WAL. */
        logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1239,6 +1243,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
        SetLatch(MyLatch);
 }
 
+/*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+       static TimestampTz sendTime = 0;
+       TimestampTz now = GetCurrentTimestamp();
+
+       /*
+        * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+        * to avoid flooding the lag tracker when we commit frequently.
+        */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
+       if (!TimestampDifferenceExceeds(sendTime, now,
+                                                                       WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+               return;
+
+       LagTrackerWrite(lsn, now);
+       sendTime = now;
+}
+
 /*
  * Wait till WAL < loc is flushed to disk so it can be safely read.
  */
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
        if (record != NULL)
        {
                /*
-                * Note the lack of any call to LagTrackerWrite() which is the responsibility
-                * of the logical decoding plugin. Response messages are handled normally,
-                * so this responsibility does not extend to needing to call LagTrackerRead().
+                * Note the lack of any call to LagTrackerWrite() which is handled
+                * by WalSndUpdateProgress which is called by output plugin through
+                * logical decoding write api.
                 */
                LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
  * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
  * eventually reported to have been written, flushed and applied by the
  * standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
  */
-void
+static void
 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
 {
        bool buffer_full;
index d0b2e0bbaefff5433659a80f60818ec4ba99aa69..090f9c82680584acf47d597dbb9c08748aa3439a 100644 (file)
@@ -26,6 +26,12 @@ typedef void (*LogicalOutputPluginWriterWrite) (
 
 typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (
+                                                                                  struct LogicalDecodingContext *lr,
+                                                                                                                       XLogRecPtr Ptr,
+                                                                                                                       TransactionId xid
+);
+
 typedef struct LogicalDecodingContext
 {
        /* memory context this is all allocated in */
@@ -52,6 +58,7 @@ typedef struct LogicalDecodingContext
         */
        LogicalOutputPluginWriterPrepareWrite prepare_write;
        LogicalOutputPluginWriterWrite write;
+       LogicalOutputPluginWriterUpdateProgress update_progress;
 
        /*
         * Output buffer.
@@ -85,13 +92,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
                                                  bool need_full_snapshot,
                                                  XLogPageReadCB read_page,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
-                                                 LogicalOutputPluginWriterWrite do_write);
+                                                 LogicalOutputPluginWriterWrite do_write,
+                                                 LogicalOutputPluginWriterUpdateProgress update_progress);
 extern LogicalDecodingContext *CreateDecodingContext(
                                          XLogRecPtr start_lsn,
                                          List *output_plugin_options,
                                          XLogPageReadCB read_page,
                                          LogicalOutputPluginWriterPrepareWrite prepare_write,
-                                         LogicalOutputPluginWriterWrite do_write);
+                                         LogicalOutputPluginWriterWrite do_write,
+                                         LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
@@ -101,8 +110,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
                                                                          XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
-extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
-
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
index 08e962d0c0c9376fb7b80a2dcb4fb3499d09029a..2435e2be2d2d4ad94bd54aa632c7d67765a5485e 100644 (file)
@@ -106,5 +106,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
 
 #endif   /* OUTPUT_PLUGIN_H */