Fix consistency issues with replication slot copy
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 17 Mar 2020 19:13:18 +0000 (16:13 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 17 Mar 2020 19:13:18 +0000 (16:13 -0300)
Commit 9f06d79ef831's replication slot copying failed to
properly reserve the WAL that the slot is expecting to see
during DecodingContextFindStartpoint (to set the confirmed_flush
LSN), so concurrent activity could remove that WAL and cause the
copy process to error out.  But it doesn't actually *need* that
WAL anyway: instead of running decode to find confirmed_flush, it
can be copied from the source slot. Fix this by rearranging things
to avoid DecodingContextFindStartpoint() (leaving the target slot's
confirmed_flush_lsn to invalid), and set that up afterwards by copying
from the target slot's value.

Also ensure the source slot's confirmed_flush_lsn is valid.

Reported-by: Arseny Sher
Author: Masahiko Sawada, Arseny Sher
Discussion: https://postgr.es/m/871rr3ohbo.fsf@ars-thinkpad

src/backend/replication/logical/logical.c
src/backend/replication/slotfuncs.c

index e3da7d36250330ee842126c7391be41205cf6a35..5adf253583b1cfc7dc4e0586afd834a1f4bf68bd 100644 (file)
@@ -208,6 +208,8 @@ StartupDecodingContext(List *output_plugin_options,
  *
  * plugin -- contains the name of the output plugin
  * output_plugin_options -- contains options passed to the output plugin
+ * need_full_snapshot -- if true, must obtain a snapshot able to read all
+ *             tables; if false, one that can read only catalogs is acceptable.
  * restart_lsn -- if given as invalid, it's this routine's responsibility to
  *             mark WAL as reserved by setting a convenient restart_lsn for the slot.
  *             Otherwise, we set for decoding to start from the given LSN without
index 2c9d5de6d9037a9dc1f1682ff143084befc5f3f5..beb735d87b64473d6b8360222d71530114ee4c3c 100644 (file)
@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  * Helper function for creating a new logical replication slot with
  * given arguments. Note that this function doesn't release the created
  * slot.
+ *
+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
+ * caller's responsibility to ensure it's set to something sensible.
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-                                                               bool temporary, XLogRecPtr restart_lsn)
+                                                               bool temporary, XLogRecPtr restart_lsn,
+                                                               bool find_startpoint)
 {
        LogicalDecodingContext *ctx = NULL;
 
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
                                                  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 
        /*
-        * Create logical decoding context, to build the initial snapshot.
+        * Create logical decoding context to find start point or, if we don't
+        * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
+        *
+        * Note: when !find_startpoint this is still important, because it's at
+        * this point that the output plugin is validated.
         */
        ctx = CreateInitDecodingContext(plugin, NIL,
-                                                                       false,  /* do not build snapshot */
+                                                                       false,  /* just catalogs is OK */
                                                                        restart_lsn,
                                                                        logical_read_local_xlog_page, NULL, NULL,
                                                                        NULL);
 
-       /* build initial snapshot, might take a while */
-       DecodingContextFindStartpoint(ctx);
+       /*
+        * If caller needs us to determine the decoding start point, do so now.
+        * This might take a while.
+        */
+       if (find_startpoint)
+               DecodingContextFindStartpoint(ctx);
 
        /* don't need the decoding context anymore */
        FreeDecodingContext(ctx);
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
        create_logical_replication_slot(NameStr(*name),
                                                                        NameStr(*plugin),
                                                                        temporary,
-                                                                       InvalidXLogRecPtr);
+                                                                       InvalidXLogRecPtr,
+                                                                       true);
 
        values[0] = NameGetDatum(&MyReplicationSlot->data.name);
        values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 
        /* Create new slot and acquire it */
        if (logical_slot)
+       {
+               /*
+                * We must not try to read WAL, since we haven't reserved it yet --
+                * hence pass find_startpoint false.  confirmed_flush will be set
+                * below, by copying from the source slot.
+                */
                create_logical_replication_slot(NameStr(*dst_name),
                                                                                plugin,
                                                                                temporary,
-                                                                               src_restart_lsn);
+                                                                               src_restart_lsn,
+                                                                               false);
+       }
        else
                create_physical_replication_slot(NameStr(*dst_name),
                                                                                 true,
@@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                TransactionId copy_xmin;
                TransactionId copy_catalog_xmin;
                XLogRecPtr      copy_restart_lsn;
+               XLogRecPtr      copy_confirmed_flush;
                bool            copy_islogical;
                char       *copy_name;
 
@@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                copy_xmin = src->data.xmin;
                copy_catalog_xmin = src->data.catalog_xmin;
                copy_restart_lsn = src->data.restart_lsn;
+               copy_confirmed_flush = src->data.confirmed_flush;
 
                /* for existence check */
                copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                                                        NameStr(*src_name)),
                                         errdetail("The source replication slot was modified incompatibly during the copy operation.")));
 
+               /* The source slot must have a consistent snapshot */
+               if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("cannot copy unfinished logical replication slot \"%s\"",
+                                                       NameStr(*src_name)),
+                                        errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
+
                /* Install copied values again */
                SpinLockAcquire(&MyReplicationSlot->mutex);
                MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
                MyReplicationSlot->data.xmin = copy_xmin;
                MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
                MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+               MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
                SpinLockRelease(&MyReplicationSlot->mutex);
 
                ReplicationSlotMarkDirty();