Allow pg_rewind to use a standby server as the source system.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 12 Nov 2020 12:52:24 +0000 (14:52 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 12 Nov 2020 12:52:24 +0000 (14:52 +0200)
Using a hot standby server as the source has not been possible, because
pg_rewind creates a temporary table in the source system, to hold the
list of file ranges that need to be fetched. Refactor it to queue up the
file fetch requests in pg_rewind's memory, so that the temporary table
is no longer needed.

Also update the logic to compute 'minRecoveryPoint' correctly, when the
source is a standby server.

Reviewed-by: Kyotaro Horiguchi, Soumyadeep Chakraborty
Discussion: https://www.postgresql.org/message-id/0c5b3783-af52-3ee5-f8fa-6e794061f70d%40iki.fi

doc/src/sgml/ref/pg_rewind.sgml
src/bin/pg_rewind/libpq_source.c
src/bin/pg_rewind/pg_rewind.c
src/bin/pg_rewind/t/003_extrafiles.pl
src/bin/pg_rewind/t/007_standby_source.pl [new file with mode: 0644]

index 43282e6016fb11213481ee6b1c3d84ab2c345d4e..07aae75d8b7972b18b3d4f8f85924f641cbc03d4 100644 (file)
@@ -173,7 +173,7 @@ PostgreSQL documentation
         with a role having sufficient permissions to execute the functions
         used by <application>pg_rewind</application> on the source server
         (see Notes section for details) or a superuser role.  This option
-        requires the source server to be running and not in recovery mode.
+        requires the source server to be running and accepting connections.
        </para>
       </listitem>
      </varlistentry>
index c73e8bf470465c47f578412aaafd742ec5ee04f4..47beba277a4ca293a78199f7554628ded30e02cc 100644 (file)
 #include "datapagemap.h"
 #include "file_ops.h"
 #include "filemap.h"
+#include "lib/stringinfo.h"
 #include "pg_rewind.h"
 #include "port/pg_bswap.h"
 #include "rewind_source.h"
 
 /*
- * Files are fetched max CHUNKSIZE bytes at a time.
- *
- * (This only applies to files that are copied in whole, or for truncated
- * files where we copy the tail. Relation files, where we know the individual
- * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
+ * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
+ * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
  */
-#define CHUNKSIZE 1000000
+#define MAX_CHUNK_SIZE (1024 * 1024)
+#define MAX_CHUNKS_PER_QUERY 1000
+
+/* represents a request to fetch a piece of a file from the source */
+typedef struct
+{
+   const char *path;           /* path relative to data directory root */
+   off_t       offset;
+   size_t      length;
+} fetch_range_request;
 
 typedef struct
 {
    rewind_source common;       /* common interface functions */
 
    PGconn     *conn;
-   bool        copy_started;
+
+   /*
+    * Queue of chunks that have been requested with the queue_fetch_range()
+    * function, but have not been fetched from the remote server yet.
+    */
+   int         num_requests;
+   fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
+
+   /* temporary space for process_queued_fetch_requests() */
+   StringInfoData paths;
+   StringInfoData offsets;
+   StringInfoData lengths;
 } libpq_source;
 
 static void init_libpq_conn(PGconn *conn);
 static char *run_simple_query(PGconn *conn, const char *sql);
 static void run_simple_command(PGconn *conn, const char *sql);
+static void appendArrayEscapedString(StringInfo buf, const char *str);
+
+static void process_queued_fetch_requests(libpq_source *src);
 
 /* public interface functions */
 static void libpq_traverse_files(rewind_source *source,
@@ -74,6 +95,10 @@ init_libpq_source(PGconn *conn)
 
    src->conn = conn;
 
+   initStringInfo(&src->paths);
+   initStringInfo(&src->offsets);
+   initStringInfo(&src->lengths);
+
    return &src->common;
 }
 
@@ -91,6 +116,12 @@ init_libpq_conn(PGconn *conn)
    run_simple_command(conn, "SET lock_timeout = 0");
    run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
 
+   /*
+    * we don't intend to do any updates, put the connection in read-only mode
+    * to keep us honest
+    */
+   run_simple_command(conn, "SET default_transaction_read_only = on");
+
    /* secure search_path */
    res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -98,17 +129,6 @@ init_libpq_conn(PGconn *conn)
                 PQresultErrorMessage(res));
    PQclear(res);
 
-   /*
-    * Check that the server is not in hot standby mode. There is no
-    * fundamental reason that couldn't be made to work, but it doesn't
-    * currently because we use a temporary table. Better to check for it
-    * explicitly than error out, for a better error message.
-    */
-   str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
-   if (strcmp(str, "f") != 0)
-       pg_fatal("source server must not be in recovery mode");
-   pg_free(str);
-
    /*
     * Also check that full_page_writes is enabled.  We can get torn pages if
     * a page is modified while we read it with pg_read_binary_file(), and we
@@ -118,6 +138,18 @@ init_libpq_conn(PGconn *conn)
    if (strcmp(str, "on") != 0)
        pg_fatal("full_page_writes must be enabled in the source server");
    pg_free(str);
+
+   /* Prepare a statement we'll use to fetch files */
+   res = PQprepare(conn, "fetch_chunks_stmt",
+                   "SELECT path, begin,\n"
+                   "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
+                   "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
+                   3, NULL);
+
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       pg_fatal("could not prepare statement to fetch file contents: %s",
+                PQresultErrorMessage(res));
+   PQclear(res);
 }
 
 /*
@@ -283,94 +315,125 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
                        size_t len)
 {
    libpq_source *src = (libpq_source *) source;
-   uint64      begin = off;
-   uint64      end = off + len;
 
    /*
-    * On first call, create a temporary table, and start COPYing to it.
-    * We will load it with the list of blocks that we need to fetch.
+    * Does this request happen to be a continuation of the previous chunk? If
+    * so, merge it with the previous one.
+    *
+    * XXX: We use pointer equality to compare the path. That's good enough
+    * for our purposes; the caller always passes the same pointer for the
+    * same filename. If it didn't, we would fail to merge requests, but it
+    * wouldn't affect correctness.
     */
-   if (!src->copy_started)
+   if (src->num_requests > 0)
    {
-       PGresult   *res;
+       fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
 
-       run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+       if (prev->offset + prev->length == off &&
+           prev->length < MAX_CHUNK_SIZE &&
+           prev->path == path)
+       {
+           /*
+            * Extend the previous request to cover as much of this new
+            * request as possible, without exceeding MAX_CHUNK_SIZE.
+            */
+           size_t      thislen;
 
-       res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
-       if (PQresultStatus(res) != PGRES_COPY_IN)
-           pg_fatal("could not send file list: %s",
-                    PQresultErrorMessage(res));
-       PQclear(res);
+           thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
+           prev->length += thislen;
 
-       src->copy_started = true;
-   }
+           off += thislen;
+           len -= thislen;
 
-   /*
-    * Write the file range to a temporary table in the server.
-    *
-    * The range is sent to the server as a COPY formatted line, to be inserted
-    * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
-    * the temporary table to actually fetch the data.
-    */
+           /*
+            * Fall through to create new requests for any remaining 'len'
+            * that didn't fit in the previous chunk.
+            */
+       }
+   }
 
-   /* Split the range into CHUNKSIZE chunks */
-   while (end - begin > 0)
+   /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
+   while (len > 0)
    {
-       char        linebuf[MAXPGPATH + 23];
-       unsigned int len;
-
-       /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
-       if (end - begin > CHUNKSIZE)
-           len = CHUNKSIZE;
-       else
-           len = (unsigned int) (end - begin);
+       int32       thislen;
 
-       snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+       /* if the queue is full, perform all the work queued up so far */
+       if (src->num_requests == MAX_CHUNKS_PER_QUERY)
+           process_queued_fetch_requests(src);
 
-       if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
-           pg_fatal("could not send COPY data: %s",
-                    PQerrorMessage(src->conn));
+       thislen = Min(len, MAX_CHUNK_SIZE);
+       src->request_queue[src->num_requests].path = path;
+       src->request_queue[src->num_requests].offset = off;
+       src->request_queue[src->num_requests].length = thislen;
+       src->num_requests++;
 
-       begin += len;
+       off += thislen;
+       len -= thislen;
    }
 }
 
 /*
- * Receive all the queued chunks and write them to the target data directory.
+ * Fetch all the queued chunks and write them to the target data directory.
  */
 static void
 libpq_finish_fetch(rewind_source *source)
 {
-   libpq_source *src = (libpq_source *) source;
+   process_queued_fetch_requests((libpq_source *) source);
+}
+
+static void
+process_queued_fetch_requests(libpq_source *src)
+{
+   const char *params[3];
    PGresult   *res;
-   const char *sql;
+   int         chunkno;
 
-   if (PQputCopyEnd(src->conn, NULL) != 1)
-       pg_fatal("could not send end-of-COPY: %s",
-                PQerrorMessage(src->conn));
+   if (src->num_requests == 0)
+       return;
 
-   while ((res = PQgetResult(src->conn)) != NULL)
+   pg_log_debug("getting %d file chunks", src->num_requests);
+
+   /*
+    * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
+    * the same length as parameters: paths, offsets and lengths. Construct
+    * the string representations of them.
+    */
+   resetStringInfo(&src->paths);
+   resetStringInfo(&src->offsets);
+   resetStringInfo(&src->lengths);
+
+   appendStringInfoChar(&src->paths, '{');
+   appendStringInfoChar(&src->offsets, '{');
+   appendStringInfoChar(&src->lengths, '{');
+   for (int i = 0; i < src->num_requests; i++)
    {
-       if (PQresultStatus(res) != PGRES_COMMAND_OK)
-           pg_fatal("unexpected result while sending file list: %s",
-                    PQresultErrorMessage(res));
-       PQclear(res);
+       fetch_range_request *rq = &src->request_queue[i];
+
+       if (i > 0)
+       {
+           appendStringInfoChar(&src->paths, ',');
+           appendStringInfoChar(&src->offsets, ',');
+           appendStringInfoChar(&src->lengths, ',');
+       }
+
+       appendArrayEscapedString(&src->paths, rq->path);
+       appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
+       appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
    }
+   appendStringInfoChar(&src->paths, '}');
+   appendStringInfoChar(&src->offsets, '}');
+   appendStringInfoChar(&src->lengths, '}');
 
    /*
-    * We've now copied the list of file ranges that we need to fetch to the
-    * temporary table. Now, actually fetch all of those ranges.
+    * Execute the prepared statement.
     */
-   sql =
-       "SELECT path, begin,\n"
-       "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
-       "FROM fetchchunks\n";
+   params[0] = src->paths.data;
+   params[1] = src->offsets.data;
+   params[2] = src->lengths.data;
 
-   if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+   if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
        pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
 
-   pg_log_debug("getting file chunks");
-
    if (PQsetSingleRowMode(src->conn) != 1)
        pg_fatal("could not set libpq connection to single row mode");
 
@@ -382,8 +445,10 @@ libpq_finish_fetch(rewind_source *source)
     * chunk    bytea   -- file content
     *----
     */
+   chunkno = 0;
    while ((res = PQgetResult(src->conn)) != NULL)
    {
+       fetch_range_request *rq = &src->request_queue[chunkno];
        char       *filename;
        int         filenamelen;
        int64       chunkoff;
@@ -404,6 +469,9 @@ libpq_finish_fetch(rewind_source *source)
                         PQresultErrorMessage(res));
        }
 
+       if (chunkno > src->num_requests)
+           pg_fatal("received more data chunks than requested");
+
        /* sanity check the result set */
        if (PQnfields(res) != 3 || PQntuples(res) != 1)
            pg_fatal("unexpected result set size while fetching remote files");
@@ -448,31 +516,74 @@ libpq_finish_fetch(rewind_source *source)
         * If a file has been deleted on the source, remove it on the target
         * as well.  Note that multiple unlink() calls may happen on the same
         * file if multiple data chunks are associated with it, hence ignore
-        * unconditionally anything missing.  If this file is not a relation
-        * data file, then it has been already truncated when creating the
-        * file chunk list at the previous execution of the filemap.
+        * unconditionally anything missing.
         */
        if (PQgetisnull(res, 0, 2))
        {
            pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
                         filename);
            remove_target_file(filename, true);
-           pg_free(filename);
-           PQclear(res);
-           continue;
        }
+       else
+       {
+           pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
+                        filename, chunkoff, chunksize);
+
+           if (strcmp(filename, rq->path) != 0)
+           {
+               pg_fatal("received data for file \"%s\", when requested for \"%s\"",
+                        filename, rq->path);
+           }
+           if (chunkoff != rq->offset)
+               pg_fatal("received data at offset " INT64_FORMAT " of file \"%s\", when requested for offset " INT64_FORMAT,
+                        chunkoff, rq->path, (int64) rq->offset);
 
-       pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
-                    filename, chunkoff, chunksize);
+           /*
+            * We should not receive receive more data than we requested, or
+            * pg_read_binary_file() messed up.  We could receive less,
+            * though, if the file was truncated in the source after we
+            * checked its size. That's OK, there should be a WAL record of
+            * the truncation, which will get replayed when you start the
+            * target system for the first time after pg_rewind has completed.
+            */
+           if (chunksize > rq->length)
+               pg_fatal("received more than requested for file \"%s\"", rq->path);
 
-       open_target_file(filename, false);
+           open_target_file(filename, false);
 
-       write_target_range(chunk, chunkoff, chunksize);
+           write_target_range(chunk, chunkoff, chunksize);
+       }
 
        pg_free(filename);
 
        PQclear(res);
+       chunkno++;
    }
+   if (chunkno != src->num_requests)
+       pg_fatal("unexpected number of data chunks received");
+
+   src->num_requests = 0;
+}
+
+/*
+ * Escape a string to be used as element in a text array constant
+ */
+static void
+appendArrayEscapedString(StringInfo buf, const char *str)
+{
+   appendStringInfoCharMacro(buf, '\"');
+   while (*str)
+   {
+       char        ch = *str;
+
+       if (ch == '"' || ch == '\\')
+           appendStringInfoCharMacro(buf, '\\');
+
+       appendStringInfoCharMacro(buf, ch);
+
+       str++;
+   }
+   appendStringInfoCharMacro(buf, '\"');
 }
 
 /*
@@ -521,6 +632,12 @@ libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
 static void
 libpq_destroy(rewind_source *source)
 {
-   pfree(source);
+   libpq_source *src = (libpq_source *) source;
+
+   pfree(src->paths.data);
+   pfree(src->offsets.data);
+   pfree(src->lengths.data);
+   pfree(src);
+
    /* NOTE: we don't close the connection here, as it was not opened by us. */
 }
index 52e3fc40e85b287a96d38ce42c3da939ac901c98..2bbf8e74385fb7f672d8c7148e006f2767e9b3fb 100644 (file)
@@ -50,6 +50,7 @@ static void disconnect_atexit(void);
 
 static ControlFileData ControlFile_target;
 static ControlFileData ControlFile_source;
+static ControlFileData ControlFile_source_after;
 
 const char *progname;
 int            WalSegSz;
@@ -486,6 +487,8 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
    XLogRecPtr  endrec;
    TimeLineID  endtli;
    ControlFileData ControlFile_new;
+   size_t      size;
+   char       *buffer;
 
    /*
     * Execute the actions in the file map, fetching data from the source
@@ -552,40 +555,104 @@ perform_rewind(filemap_t *filemap, rewind_source *source,
        }
    }
 
-   /*
-    * We've now copied the list of file ranges that we need to fetch to the
-    * temporary table. Now, actually fetch all of those ranges.
-    */
+   /* Complete any remaining range-fetches that we queued up above. */
    source->finish_fetch(source);
 
    close_target_file();
 
    progress_report(true);
 
+   /*
+    * Fetch the control file from the source last. This ensures that the
+    * minRecoveryPoint is up-to-date.
+    */
+   buffer = source->fetch_file(source, "global/pg_control", &size);
+   digestControlFile(&ControlFile_source_after, buffer, size);
+   pg_free(buffer);
+
+   /*
+    * Sanity check: If the source is a local system, the control file should
+    * not have changed since we started.
+    *
+    * XXX: We assume it hasn't been modified, but actually, what could go
+    * wrong? The logic handles a libpq source that's modified concurrently,
+    * why not a local datadir?
+    */
+   if (datadir_source &&
+       memcmp(&ControlFile_source, &ControlFile_source_after,
+              sizeof(ControlFileData)) != 0)
+   {
+       pg_fatal("source system was modified while pg_rewind was running");
+   }
+
    if (showprogress)
        pg_log_info("creating backup label and updating control file");
-   createBackupLabel(chkptredo, chkpttli, chkptrec);
 
    /*
-    * Update control file of target. Make it ready to perform archive
-    * recovery when restarting.
+    * Create a backup label file, to tell the target where to begin the WAL
+    * replay. Normally, from the last common checkpoint between the source
+    * and the target. But if the source is a standby server, it's possible
+    * that the last common checkpoint is *after* the standby's restartpoint.
+    * That implies that the source server has applied the checkpoint record,
+    * but hasn't perfomed a corresponding restartpoint yet. Make sure we
+    * start at the restartpoint's redo point in that case.
     *
-    * Like in an online backup, it's important that we replay all the WAL
-    * that was generated while we copied the files over. To enforce that, set
-    * 'minRecoveryPoint' in the control file.
+    * Use the old version of the source's control file for this. The server
+    * might have finished the restartpoint after we started copying files,
+    * but we must begin from the redo point at the time that started copying.
     */
-   memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
+   if (ControlFile_source.checkPointCopy.redo < chkptredo)
+   {
+       chkptredo = ControlFile_source.checkPointCopy.redo;
+       chkpttli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+       chkptrec = ControlFile_source.checkPoint;
+   }
+   createBackupLabel(chkptredo, chkpttli, chkptrec);
 
+   /*
+    * Update control file of target, to tell the target how far it must
+    * replay the WAL (minRecoveryPoint).
+    */
    if (connstr_source)
    {
-       endrec = source->get_current_wal_insert_lsn(source);
-       endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+       /*
+        * The source is a live server. Like in an online backup, it's
+        * important that we recover all the WAL that was generated while we
+        * were copying files.
+        */
+       if (ControlFile_source_after.state == DB_IN_ARCHIVE_RECOVERY)
+       {
+           /*
+            * Source is a standby server. We must replay to its
+            * minRecoveryPoint.
+            */
+           endrec = ControlFile_source_after.minRecoveryPoint;
+           endtli = ControlFile_source_after.minRecoveryPointTLI;
+       }
+       else
+       {
+           /*
+            * Source is a production, non-standby, server. We must replay to
+            * the last WAL insert location.
+            */
+           if (ControlFile_source_after.state != DB_IN_PRODUCTION)
+               pg_fatal("source system was in unexpected state at end of rewind");
+
+           endrec = source->get_current_wal_insert_lsn(source);
+           endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
+       }
    }
    else
    {
-       endrec = ControlFile_source.checkPoint;
-       endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
+       /*
+        * Source is a local data directory. It should've shut down cleanly,
+        * and we must replay to the latest shutdown checkpoint.
+        */
+       endrec = ControlFile_source_after.checkPoint;
+       endtli = ControlFile_source_after.checkPointCopy.ThisTimeLineID;
    }
+
+   memcpy(&ControlFile_new, &ControlFile_source_after, sizeof(ControlFileData));
    ControlFile_new.minRecoveryPoint = endrec;
    ControlFile_new.minRecoveryPointTLI = endtli;
    ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
index 48849fb49aa25437546e5b4345da504a443af367..ece6ba2e4320f520d9540630e6a2095480298113 100644 (file)
@@ -40,10 +40,22 @@ sub run_test
      "in standby1";
    append_to_file "$test_standby_datadir/tst_standby_dir/standby_file2",
      "in standby2";
-   mkdir "$test_standby_datadir/tst_standby_dir/standby_subdir/";
    append_to_file
-     "$test_standby_datadir/tst_standby_dir/standby_subdir/standby_file3",
+     "$test_standby_datadir/tst_standby_dir/standby_file3 with 'quotes'",
      "in standby3";
+   append_to_file
+     "$test_standby_datadir/tst_standby_dir/standby_file4 with double\"quote",
+     "in standby4";
+   append_to_file
+     "$test_standby_datadir/tst_standby_dir/standby_file5 with back\\slash",
+     "in standby5";
+   append_to_file
+     "$test_standby_datadir/tst_standby_dir/standby_file6_with_backslash\\\"and_double-quote",
+     "in standby6";
+   mkdir "$test_standby_datadir/tst_standby_dir/standby_subdir/";
+   append_to_file
+     "$test_standby_datadir/tst_standby_dir/standby_subdir/standby_file7",
+     "in standby7";
 
    mkdir "$test_primary_datadir/tst_primary_dir";
    append_to_file "$test_primary_datadir/tst_primary_dir/primary_file1",
@@ -58,7 +70,9 @@ sub run_test
    RewindTest::promote_standby();
    RewindTest::run_pg_rewind($test_mode);
 
-   # List files in the data directory after rewind.
+   # List files in the data directory after rewind. All the files that
+   # were present in the standby should be present after rewind, and
+   # all the files that were added on the primary should be removed.
    my @paths;
    find(
        sub {
@@ -78,8 +92,12 @@ sub run_test
            "$test_primary_datadir/tst_standby_dir",
            "$test_primary_datadir/tst_standby_dir/standby_file1",
            "$test_primary_datadir/tst_standby_dir/standby_file2",
+           "$test_primary_datadir/tst_standby_dir/standby_file3 with 'quotes'",
+           "$test_primary_datadir/tst_standby_dir/standby_file4 with double\"quote",
+           "$test_primary_datadir/tst_standby_dir/standby_file5 with back\\slash",
+           "$test_primary_datadir/tst_standby_dir/standby_file6_with_backslash\\\"and_double-quote",
            "$test_primary_datadir/tst_standby_dir/standby_subdir",
-           "$test_primary_datadir/tst_standby_dir/standby_subdir/standby_file3"
+           "$test_primary_datadir/tst_standby_dir/standby_subdir/standby_file7"
        ],
        "file lists match");
 
diff --git a/src/bin/pg_rewind/t/007_standby_source.pl b/src/bin/pg_rewind/t/007_standby_source.pl
new file mode 100644 (file)
index 0000000..64b7f1a
--- /dev/null
@@ -0,0 +1,174 @@
+#
+# Test using a standby server as the source.
+#
+# This sets up three nodes: A, B and C. First, A is the primary,
+# B follows A, and C follows B:
+#
+# A (primary) <--- B (standby) <--- C (standby)
+#
+#
+# Then we promote C, and insert some divergent rows in A and C:
+#
+# A (primary) <--- B (standby)      C (primary)
+#
+#
+# Finally, we run pg_rewind on C, to re-point it at B again:
+#
+# A (primary) <--- B (standby) <--- C (standby)
+#
+#
+# The test is similar to the basic tests, but since we're dealing with
+# three nodes, not two, we cannot use most of the RewindTest functions
+# as is.
+
+use strict;
+use warnings;
+use TestLib;
+use Test::More tests => 3;
+
+use FindBin;
+use lib $FindBin::RealBin;
+use File::Copy;
+use PostgresNode;
+use RewindTest;
+
+my $tmp_folder = TestLib::tempdir;
+
+my $node_a;
+my $node_b;
+my $node_c;
+
+# Set up node A, as primary
+#
+# A (primary)
+
+setup_cluster('a');
+start_primary();
+$node_a = $node_primary;
+
+# Create a test table and insert a row in primary.
+$node_a->safe_psql('postgres', "CREATE TABLE tbl1 (d text)");
+$node_a->safe_psql('postgres', "INSERT INTO tbl1 VALUES ('in A')");
+primary_psql("CHECKPOINT");
+
+# Set up node B and C, as cascaded standbys
+#
+# A (primary) <--- B (standby) <--- C (standby)
+$node_a->backup('my_backup');
+$node_b = get_new_node('node_b');
+$node_b->init_from_backup($node_a, 'my_backup', has_streaming => 1);
+$node_b->set_standby_mode();
+$node_b->start;
+
+$node_b->backup('my_backup');
+$node_c = get_new_node('node_c');
+$node_c->init_from_backup($node_b, 'my_backup', has_streaming => 1);
+$node_c->set_standby_mode();
+$node_c->start;
+
+# Insert additional data on A, and wait for both standbys to catch up.
+$node_a->safe_psql('postgres',
+   "INSERT INTO tbl1 values ('in A, before promotion')");
+$node_a->safe_psql('postgres', 'CHECKPOINT');
+
+my $lsn = $node_a->lsn('insert');
+$node_a->wait_for_catchup('node_b', 'write', $lsn);
+$node_b->wait_for_catchup('node_c', 'write', $lsn);
+
+# Promote C
+#
+# A (primary) <--- B (standby)      C (primary)
+
+$node_c->promote;
+$node_c->safe_psql('postgres', "checkpoint");
+
+
+# Insert a row in A. This causes A/B and C to have "diverged", so that it's
+# no longer possible to just apply the standy's logs over primary directory
+# - you need to rewind.
+$node_a->safe_psql('postgres',
+   "INSERT INTO tbl1 VALUES ('in A, after C was promoted')");
+
+# Also insert a new row in the standby, which won't be present in the
+# old primary.
+$node_c->safe_psql('postgres',
+   "INSERT INTO tbl1 VALUES ('in C, after C was promoted')");
+
+
+#
+# All set up. We're ready to run pg_rewind.
+#
+my $node_c_pgdata = $node_c->data_dir;
+
+# Stop the node and be ready to perform the rewind.
+$node_c->stop('fast');
+
+# Keep a temporary postgresql.conf or it would be overwritten during the rewind.
+copy(
+   "$node_c_pgdata/postgresql.conf",
+   "$tmp_folder/node_c-postgresql.conf.tmp");
+
+{
+   # Temporarily unset PGAPPNAME so that the server doesn't
+   # inherit it.  Otherwise this could affect libpqwalreceiver
+   # connections in confusing ways.
+   local %ENV = %ENV;
+   delete $ENV{PGAPPNAME};
+
+   # Do rewind using a remote connection as source, generating
+   # recovery configuration automatically.
+   command_ok(
+       [
+           'pg_rewind',                      "--debug",
+           "--source-server",                $node_b->connstr('postgres'),
+           "--target-pgdata=$node_c_pgdata", "--no-sync",
+           "--write-recovery-conf"
+       ],
+       'pg_rewind remote');
+}
+
+# Now move back postgresql.conf with old settings
+move(
+   "$tmp_folder/node_c-postgresql.conf.tmp",
+   "$node_c_pgdata/postgresql.conf");
+
+# Restart the node.
+$node_c->start;
+
+# set RewindTest::node_primary to point to the rewinded node, so that we can
+# use check_query()
+$node_primary = $node_c;
+
+# Run some checks to verify that C has been successfully rewound,
+# and connected back to follow B.
+
+check_query(
+   'SELECT * FROM tbl1',
+   qq(in A
+in A, before promotion
+in A, after C was promoted
+),
+   'table content after rewind');
+
+# Insert another row, and observe that it's cascaded from A to B to C.
+$node_a->safe_psql('postgres',
+   "INSERT INTO tbl1 values ('in A, after rewind')");
+
+$lsn = $node_a->lsn('insert');
+$node_b->wait_for_catchup('node_c', 'write', $lsn);
+
+check_query(
+   'SELECT * FROM tbl1',
+   qq(in A
+in A, before promotion
+in A, after C was promoted
+in A, after rewind
+),
+   'table content after rewind and insert');
+
+# clean up
+$node_a->teardown_node;
+$node_b->teardown_node;
+$node_c->teardown_node;
+
+exit(0);