Fix consistency issues with replication slot copy
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 17 Mar 2020 19:13:18 +0000 (16:13 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 17 Mar 2020 19:13:18 +0000 (16:13 -0300)
Commit 9f06d79ef831's replication slot copying failed to
properly reserve the WAL that the slot is expecting to see
during DecodingContextFindStartpoint (to set the confirmed_flush
LSN), so concurrent activity could remove that WAL and cause the
copy process to error out.  But it doesn't actually *need* that
WAL anyway: instead of running decode to find confirmed_flush, it
can be copied from the source slot. Fix this by rearranging things
to avoid DecodingContextFindStartpoint() (leaving the target slot's
confirmed_flush_lsn to invalid), and set that up afterwards by copying
from the target slot's value.

Also ensure the source slot's confirmed_flush_lsn is valid.

Reported-by: Arseny Sher
Author: Masahiko Sawada, Arseny Sher
Discussion: https://postgr.es/m/871rr3ohbo.fsf@ars-thinkpad

src/backend/replication/logical/logical.c
src/backend/replication/slotfuncs.c

index e3da7d36250330ee842126c7391be41205cf6a35..5adf253583b1cfc7dc4e0586afd834a1f4bf68bd 100644 (file)
@@ -208,6 +208,8 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin -- contains the name of the output plugin
  * output_plugin_options -- contains options passed to the output plugin
+ * need_full_snapshot -- if true, must obtain a snapshot able to read all
+ *     tables; if false, one that can read only catalogs is acceptable.
  * restart_lsn -- if given as invalid, it's this routine's responsibility to
  *     mark WAL as reserved by setting a convenient restart_lsn for the slot.
  *     Otherwise, we set for decoding to start from the given LSN without
index 2c9d5de6d9037a9dc1f1682ff143084befc5f3f5..beb735d87b64473d6b8360222d71530114ee4c3c 100644 (file)
@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  * Helper function for creating a new logical replication slot with
  * given arguments. Note that this function doesn't release the created
  * slot.
+ *
+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
+ * caller's responsibility to ensure it's set to something sensible.
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                               bool temporary, XLogRecPtr restart_lsn)
+                               bool temporary, XLogRecPtr restart_lsn,
+                               bool find_startpoint)
 {
    LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
                          temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
    /*
-    * Create logical decoding context, to build the initial snapshot.
+    * Create logical decoding context to find start point or, if we don't
+    * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
+    *
+    * Note: when !find_startpoint this is still important, because it's at
+    * this point that the output plugin is validated.
     */
    ctx = CreateInitDecodingContext(plugin, NIL,
-                                   false,  /* do not build snapshot */
+                                   false,  /* just catalogs is OK */
                                    restart_lsn,
                                    logical_read_local_xlog_page, NULL, NULL,
                                    NULL);
 
-   /* build initial snapshot, might take a while */
-   DecodingContextFindStartpoint(ctx);
+   /*
+    * If caller needs us to determine the decoding start point, do so now.
+    * This might take a while.
+    */
+   if (find_startpoint)
+       DecodingContextFindStartpoint(ctx);
 
    /* don't need the decoding context anymore */
    FreeDecodingContext(ctx);
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
    create_logical_replication_slot(NameStr(*name),
                                    NameStr(*plugin),
                                    temporary,
-                                   InvalidXLogRecPtr);
+                                   InvalidXLogRecPtr,
+                                   true);
 
    values[0] = NameGetDatum(&MyReplicationSlot->data.name);
    values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
    /* Create new slot and acquire it */
    if (logical_slot)
+   {
+       /*
+        * We must not try to read WAL, since we haven't reserved it yet --
+        * hence pass find_startpoint false.  confirmed_flush will be set
+        * below, by copying from the source slot.
+        */
        create_logical_replication_slot(NameStr(*dst_name),
                                        plugin,
                                        temporary,
-                                       src_restart_lsn);
+                                       src_restart_lsn,
+                                       false);
+   }
    else
        create_physical_replication_slot(NameStr(*dst_name),
                                         true,
@@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        TransactionId copy_xmin;
        TransactionId copy_catalog_xmin;
        XLogRecPtr  copy_restart_lsn;
+       XLogRecPtr  copy_confirmed_flush;
        bool        copy_islogical;
        char       *copy_name;
 
@@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        copy_xmin = src->data.xmin;
        copy_catalog_xmin = src->data.catalog_xmin;
        copy_restart_lsn = src->data.restart_lsn;
+       copy_confirmed_flush = src->data.confirmed_flush;
 
        /* for existence check */
        copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                            NameStr(*src_name)),
                     errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+       /* The source slot must have a consistent snapshot */
+       if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+           ereport(ERROR,
+                   (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                    errmsg("cannot copy unfinished logical replication slot \"%s\"",
+                           NameStr(*src_name)),
+                    errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
+
        /* Install copied values again */
        SpinLockAcquire(&MyReplicationSlot->mutex);
        MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        MyReplicationSlot->data.xmin = copy_xmin;
        MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
        MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+       MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
        SpinLockRelease(&MyReplicationSlot->mutex);
 
        ReplicationSlotMarkDirty();