Make pg_receivexlog and pg_basebackup -X stream work across timeline switches.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 18:23:00 +0000 (20:23 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 17 Jan 2013 18:23:00 +0000 (20:23 +0200)
This mirrors the changes done earlier to the server in standby mode. When
receivelog reaches the end of a timeline, as reported by the server, it
fetches the timeline history file of the next timeline, and restarts
streaming from the new timeline by issuing a new START_STREAMING command.

When pg_receivexlog crosses a timeline, it leaves the .partial suffix on the
last segment on the old timeline. This helps you to tell apart a partial
segment left in the directory because of a timeline switch, and a completed
segment. If you just follow a single server, it won't make a difference, but
it can be significant in more complicated scenarios where new WAL is still
generated on the old timeline.

This includes two small changes to the streaming replication protocol:
First, when you reach the end of timeline while streaming, the server now
sends the TLI of the next timeline in the server's history to the client.
pg_receivexlog uses that as the next timeline, so that it doesn't need to
parse the timeline history file like a standby server does. Second, when
BASE_BACKUP command sends the begin and end WAL positions, it now also sends
the timeline IDs corresponding the positions.

12 files changed:
doc/src/sgml/protocol.sgml
src/backend/access/transam/timeline.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogfuncs.c
src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/include/access/timeline.h
src/include/access/xlog.h

index e14627c201ebc5ac63271afd3d79ddf900a60451..baae59de6e933c293b09db7c16288f375295cb2a 100644 (file)
@@ -1418,8 +1418,10 @@ The commands accepted in walsender mode are:
      <para>
       After streaming all the WAL on a timeline that is not the latest one,
       the server will end streaming by exiting the COPY mode. When the client
-      acknowledges this by also exiting COPY mode, the server responds with a
-      CommandComplete message, and is ready to accept a new command.
+      acknowledges this by also exiting COPY mode, the server sends a
+      single-row, single-column result set indicating the next timeline in
+      this server's history. That is followed by a CommandComplete message,
+      and the server is ready to accept a new command.
      </para>
 
      <para>
@@ -1784,7 +1786,9 @@ The commands accepted in walsender mode are:
      </para>
      <para>
       The first ordinary result set contains the starting position of the
-      backup, given in XLogRecPtr format as a single column in a single row.
+      backup, in a single row with two columns. The first column contains
+      the start position given in XLogRecPtr format, and the second column
+      contains the corresponding timeline ID.
      </para>
      <para>
       The second ordinary result set has one row for each tablespace.
@@ -1827,7 +1831,9 @@ The commands accepted in walsender mode are:
       <quote>ustar interchange format</> specified in the POSIX 1003.1-2008
       standard) dump of the tablespace contents, except that the two trailing
       blocks of zeroes specified in the standard are omitted.
-      After the tar data is complete, a final ordinary result set will be sent.
+      After the tar data is complete, a final ordinary result set will be sent,
+      containing the WAL end position of the backup, in the same format as
+      the start position.
      </para>
 
      <para>
index 46379bbff8880391824c1ad23bd4769f4f546eb2..ad4f3162c53852140fa1cb80902fcc4851eb206c 100644 (file)
@@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history)
 }
 
 /*
- * Returns the point in history where we branched off the given timeline.
- * Returns InvalidXLogRecPtr if the timeline is current (= we have not
- * branched off from it), and throws an error if the timeline is not part of
- * this server's history.
+ * Returns the point in history where we branched off the given timeline,
+ * and the timeline we branched to (*nextTLI). Returns InvalidXLogRecPtr if
+ * the timeline is current, ie. we have not branched off from it, and throws
+ * an error if the timeline is not part of this server's history.
  */
 XLogRecPtr
-tliSwitchPoint(TimeLineID tli, List *history)
+tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
 {
    ListCell   *cell;
 
+   if (nextTLI)
+       *nextTLI = 0;
    foreach (cell, history)
    {
        TimeLineHistoryEntry *tle = (TimeLineHistoryEntry *) lfirst(cell);
 
        if (tle->tli == tli)
            return tle->end;
+       if (nextTLI)
+           *nextTLI = tle->tli;
    }
 
    ereport(ERROR,
index ac2b26b49822561fe10da71b6a2e0daf1a7e803d..90ba32ef0f52f825fff375301e027506d785b89e 100644 (file)
@@ -4930,7 +4930,7 @@ StartupXLOG(void)
         * tliSwitchPoint will throw an error if the checkpoint's timeline
         * is not in expectedTLEs at all.
         */
-       switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs);
+       switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL);
        ereport(FATAL,
                (errmsg("requested timeline %u is not a child of this server's history",
                        recoveryTargetTLI),
@@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno)
  * non-exclusive backups active at the same time, and they don't conflict
  * with an exclusive backup either.
  *
+ * Returns the minimum WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *starttli_p.
+ *
  * Every successfully started non-exclusive backup must be stopped by calling
  * do_pg_stop_backup() or do_pg_abort_backup().
  */
 XLogRecPtr
-do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
+do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
+                  char **labelfile)
 {
    bool        exclusive = (labelfile == NULL);
    bool        backup_started_in_recovery = false;
    XLogRecPtr  checkpointloc;
    XLogRecPtr  startpoint;
+   TimeLineID  starttli;
    pg_time_t   stamp_time;
    char        strfbuf[128];
    char        xlogfilename[MAXFNAMELEN];
@@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
            LWLockAcquire(ControlFileLock, LW_SHARED);
            checkpointloc = ControlFile->checkPoint;
            startpoint = ControlFile->checkPointCopy.redo;
+           starttli = ControlFile->checkPointCopy.ThisTimeLineID;
            checkpointfpw = ControlFile->checkPointCopy.fullPageWrites;
            LWLockRelease(ControlFileLock);
 
@@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
    /*
     * We're done.  As a convenience, return the starting WAL location.
     */
+   if (starttli_p)
+       *starttli_p = starttli;
    return startpoint;
 }
 
@@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg)
 
  * If labelfile is NULL, this stops an exclusive backup. Otherwise this stops
  * the non-exclusive backup specified by 'labelfile'.
+ *
+ * Returns the last WAL position that must be present to restore from this
+ * backup, and the corresponding timeline ID in *stoptli_p.
  */
 XLogRecPtr
-do_pg_stop_backup(char *labelfile, bool waitforarchive)
+do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 {
    bool        exclusive = (labelfile == NULL);
    bool        backup_started_in_recovery = false;
    XLogRecPtr  startpoint;
    XLogRecPtr  stoppoint;
+   TimeLineID  stoptli;
    XLogRecData rdata;
    pg_time_t   stamp_time;
    char        strfbuf[128];
@@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
 
        LWLockAcquire(ControlFileLock, LW_SHARED);
        stoppoint = ControlFile->minRecoveryPoint;
+       stoptli = ControlFile->minRecoveryPointTLI;
        LWLockRelease(ControlFileLock);
 
+       if (stoptli_p)
+           *stoptli_p = stoptli;
        return stoppoint;
    }
 
@@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
    rdata.buffer = InvalidBuffer;
    rdata.next = NULL;
    stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+   stoptli = ThisTimeLineID;
 
    /*
     * Force a switch to a new xlog segment file, so that the backup is valid
@@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive)
    /*
     * We're done.  As a convenience, return the ending WAL location.
     */
+   if (stoptli_p)
+       *stoptli_p = stoptli;
    return stoppoint;
 }
 
index 96db5dbbf52ca729e2246d102fc9757043763e31..b6bb6773d6b433452dffd0b083fc5d1fddeb879a 100644 (file)
@@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS)
 
    backupidstr = text_to_cstring(backupid);
 
-   startpoint = do_pg_start_backup(backupidstr, fast, NULL);
+   startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL);
 
    snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X",
             (uint32) (startpoint >> 32), (uint32) startpoint);
@@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS)
    XLogRecPtr  stoppoint;
    char        stopxlogstr[MAXFNAMELEN];
 
-   stoppoint = do_pg_stop_backup(NULL, true);
+   stoppoint = do_pg_stop_backup(NULL, true, NULL);
 
    snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X",
             (uint32) (stoppoint >> 32), (uint32) stoppoint);
index 2330fcc23ad8e8c6130eba63495a27747135c4fb..57946a9fa978cf6a2e8e1e8b8aa0d62dcf03f025 100644 (file)
@@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces);
 static void base_backup_cleanup(int code, Datum arg);
 static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int compareWalFileNames(const void *a, const void *b);
 
 /* Was the backup currently in-progress initiated in recovery mode? */
@@ -94,13 +94,16 @@ static void
 perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 {
    XLogRecPtr  startptr;
+   TimeLineID  starttli;
    XLogRecPtr  endptr;
+   TimeLineID  endtli;
    char       *labelfile;
 
    backup_started_in_recovery = RecoveryInProgress();
 
-   startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
-   SendXlogRecPtrResult(startptr);
+   startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+                                 &labelfile);
+   SendXlogRecPtrResult(startptr, starttli);
 
    PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
    {
@@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
    }
    PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
 
-   endptr = do_pg_stop_backup(labelfile, !opt->nowait);
+   endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
 
    if (opt->includewal)
    {
@@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
        /* Send CopyDone message for the last tar file */
        pq_putemptymessage('c');
    }
-   SendXlogRecPtrResult(endptr);
+   SendXlogRecPtrResult(endptr, endtli);
 }
 
 /*
@@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces)
  * XlogRecPtr record (in text format)
  */
 static void
-SendXlogRecPtrResult(XLogRecPtr ptr)
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
    StringInfoData buf;
    char        str[MAXFNAMELEN];
 
-   snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
-
    pq_beginmessage(&buf, 'T'); /* RowDescription */
-   pq_sendint(&buf, 1, 2);     /* 1 field */
+   pq_sendint(&buf, 2, 2);     /* 2 fields */
 
-   /* Field header */
+   /* Field headers */
    pq_sendstring(&buf, "recptr");
    pq_sendint(&buf, 0, 4);     /* table oid */
    pq_sendint(&buf, 0, 2);     /* attnum */
@@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr)
    pq_sendint(&buf, -1, 2);
    pq_sendint(&buf, 0, 4);
    pq_sendint(&buf, 0, 2);
+
+   pq_sendstring(&buf, "tli");
+   pq_sendint(&buf, 0, 4);     /* table oid */
+   pq_sendint(&buf, 0, 2);     /* attnum */
+   /*
+    * int8 may seem like a surprising data type for this, but in thory int4
+    * would not be wide enough for this, as TimeLineID is unsigned.
+    */
+   pq_sendint(&buf, INT8OID, 4);   /* type oid */
+   pq_sendint(&buf, -1, 2);
+   pq_sendint(&buf, 0, 4);
+   pq_sendint(&buf, 0, 2);
    pq_endmessage(&buf);
 
    /* Data row */
    pq_beginmessage(&buf, 'D');
-   pq_sendint(&buf, 1, 2);     /* number of columns */
+   pq_sendint(&buf, 2, 2);     /* number of columns */
+
+   snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
+   pq_sendint(&buf, strlen(str), 4);   /* length */
+   pq_sendbytes(&buf, str, strlen(str));
+
+   snprintf(str, sizeof(str), "%u", tli);
    pq_sendint(&buf, strlen(str), 4);   /* length */
    pq_sendbytes(&buf, str, strlen(str));
    pq_endmessage(&buf);
index ad7d1c911b377d94bb21fc20a72521b2bb72d3ac..ba138e73da387a6489ed8cbf26fe3950cce9aa60 100644 (file)
@@ -117,6 +117,7 @@ static uint32 sendOff = 0;
  * history forked off from that timeline at sendTimeLineValidUpto.
  */
 static TimeLineID  sendTimeLine = 0;
+static TimeLineID  sendTimeLineNextTLI = 0;
 static bool            sendTimeLineIsHistoric = false;
 static XLogRecPtr  sendTimeLineValidUpto = InvalidXLogRecPtr;
 
@@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
             * requested start location is on that timeline.
             */
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
-           switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+           switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
+                                        &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);
 
            /*
@@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd)
    streamingDoneSending = streamingDoneReceiving = false;
 
    /* If there is nothing to stream, don't even enter COPY mode */
-   if (!sendTimeLineIsHistoric ||
-       cmd->startpoint < sendTimeLineValidUpto)
+   if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        /*
         * When we first start replication the standby will be behind the primary.
@@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd)
        if (walsender_ready_to_stop)
            proc_exit(0);
        WalSndSetState(WALSNDSTATE_STARTUP);
+
+       Assert(streamingDoneSending && streamingDoneReceiving);
+   }
+
+   /*
+    * Copy is finished now. Send a single-row result set indicating the next
+    * timeline.
+    */
+   if (sendTimeLineIsHistoric)
+   {
+       char        str[11];
+       snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+
+       pq_beginmessage(&buf, 'T'); /* RowDescription */
+       pq_sendint(&buf, 1, 2);     /* 1 field */
+
+       /* Field header */
+       pq_sendstring(&buf, "next_tli");
+       pq_sendint(&buf, 0, 4);     /* table oid */
+       pq_sendint(&buf, 0, 2);     /* attnum */
+       /*
+        * int8 may seem like a surprising data type for this, but in theory
+        * int4 would not be wide enough for this, as TimeLineID is unsigned.
+        */
+       pq_sendint(&buf, INT8OID, 4);   /* type oid */
+       pq_sendint(&buf, -1, 2);
+       pq_sendint(&buf, 0, 4);
+       pq_sendint(&buf, 0, 2);
+       pq_endmessage(&buf);
+
+       /* Data row */
+       pq_beginmessage(&buf, 'D');
+       pq_sendint(&buf, 1, 2);     /* number of columns */
+       pq_sendint(&buf, strlen(str), 4);   /* length */
+       pq_sendbytes(&buf, str, strlen(str));
+       pq_endmessage(&buf);
    }
 
-   /* Get out of COPY mode (CommandComplete). */
-   EndCommand("COPY 0", DestRemote);
+   /* Send CommandComplete message */
+   pq_puttextmessage('C', "START_STREAMING");
 }
 
 /*
@@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup)
            List       *history;
 
            history = readTimeLineHistory(ThisTimeLineID);
-           sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+           sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
            Assert(sentPtr <= sendTimeLineValidUpto);
+           Assert(sendTimeLine < sendTimeLineNextTLI);
            list_free_deep(history);
 
            /* the current send pointer should be <= the switchpoint */
index a684c0c6fcfe9613a7c4e4d02c118ed511b5641d..b6f774469b1b4ef42a5b2ee55073127fd948fa71 100644 (file)
@@ -243,7 +243,7 @@ LogStreamerMain(logstreamer_param *param)
    if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                           param->sysidentifier, param->xlogdir,
                           reached_end_position, standby_message_timeout,
-                          true))
+                          NULL))
 
        /*
         * Any errors will already have been reported in the function process,
@@ -1220,7 +1220,7 @@ BaseBackup(void)
 {
    PGresult   *res;
    char       *sysidentifier;
-   uint32      timeline;
+   uint32      starttli;
    char        current_path[MAXPGPATH];
    char        escaped_label[MAXPGPATH];
    int         i;
@@ -1259,7 +1259,6 @@ BaseBackup(void)
        disconnect_and_exit(1);
    }
    sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
-   timeline = atoi(PQgetvalue(res, 0, 1));
    PQclear(res);
 
    /*
@@ -1291,18 +1290,24 @@ BaseBackup(void)
                progname, PQerrorMessage(conn));
        disconnect_and_exit(1);
    }
-   if (PQntuples(res) != 1)
+   if (PQntuples(res) != 1 || PQnfields(res) < 2)
    {
-       fprintf(stderr, _("%s: no start point returned from server\n"),
-               progname);
+       fprintf(stderr,
+               _("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),
+               progname, PQntuples(res), PQnfields(res), 1, 2);
        disconnect_and_exit(1);
    }
+
    strcpy(xlogstart, PQgetvalue(res, 0, 0));
-   if (verbose && includewal)
-       fprintf(stderr, "transaction log start point: %s\n", xlogstart);
+   starttli = atoi(PQgetvalue(res, 0, 1));
+
    PQclear(res);
    MemSet(xlogend, 0, sizeof(xlogend));
 
+   if (verbose && includewal)
+       fprintf(stderr, _("transaction log start point: %s on timeline %u\n"),
+               xlogstart, starttli);
+
    /*
     * Get the header
     */
@@ -1358,7 +1363,7 @@ BaseBackup(void)
        if (verbose)
            fprintf(stderr, _("%s: starting background WAL receiver\n"),
                    progname);
-       StartLogStreamer(xlogstart, timeline, sysidentifier);
+       StartLogStreamer(xlogstart, starttli, sysidentifier);
    }
 
    /*
index 7f2db1946e7d4efb608f018e5aea00f920647ca2..33dbc50389b86d3cb3e467bcda8cc458d7d06911 100644 (file)
@@ -39,8 +39,7 @@ volatile bool time_to_abort = false;
 
 
 static void usage(void);
-static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos,
-                  uint32 currenttimeline);
+static XLogRecPtr FindStreamingStart(uint32 *tli);
 static void StreamLog();
 static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
               bool segment_finished);
@@ -70,14 +69,31 @@ usage(void)
 }
 
 static bool
-stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
+stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
 {
+   static uint32 prevtimeline = 0;
+   static XLogRecPtr prevpos = InvalidXLogRecPtr;
+
+   /* we assume that we get called once at the end of each segment */
    if (verbose && segment_finished)
        fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
-               progname,
-               (uint32) (segendpos >> 32), (uint32) segendpos,
+               progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
                timeline);
 
+   /*
+    * Note that we report the previous, not current, position here. That's
+    * the exact location where the timeline switch happend. After the switch,
+    * we restart streaming from the beginning of the segment, so xlogpos can
+    * smaller than prevpos if we just switched to new timeline.
+    */
+   if (prevtimeline != 0 && prevtimeline != timeline)
+       fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
+               progname, timeline,
+               (uint32) (prevpos >> 32), (uint32) prevpos);
+
+   prevtimeline = timeline;
+   prevpos = xlogpos;
+
    if (time_to_abort)
    {
        fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
@@ -88,20 +104,19 @@ stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 }
 
 /*
- * Determine starting location for streaming, based on:
- * 1. If there are existing xlog segments, start at the end of the last one
- *   that is complete (size matches XLogSegSize)
- * 2. If no valid xlog exists, start from the beginning of the current
- *   WAL segment.
+ * Determine starting location for streaming, based on any existing xlog
+ * segments in the directory. We start at the end of the last one that is
+ * complete (size matches XLogSegSize), on the timeline with highest ID.
+ *
+ * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
  */
 static XLogRecPtr
-FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+FindStreamingStart(uint32 *tli)
 {
    DIR        *dir;
    struct dirent *dirent;
-   int         i;
-   bool        b;
    XLogSegNo   high_segno = 0;
+   uint32      high_tli = 0;
 
    dir = opendir(basedir);
    if (dir == NULL)
@@ -120,26 +135,13 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
                    seg;
        XLogSegNo   segno;
 
-       if (strcmp(dirent->d_name, ".") == 0 ||
-           strcmp(dirent->d_name, "..") == 0)
-           continue;
-
-       /* xlog files are always 24 characters */
-       if (strlen(dirent->d_name) != 24)
-           continue;
-
-       /* Filenames are always made out of 0-9 and A-F */
-       b = false;
-       for (i = 0; i < 24; i++)
-       {
-           if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
-               !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
-           {
-               b = true;
-               break;
-           }
-       }
-       if (b)
+       /*
+        * Check if the filename looks like an xlog file, or a .partial file.
+        * Xlog files are always 24 characters, and .partial files are 32
+        * characters.
+        */
+       if (strlen(dirent->d_name) != 24 ||
+           !strspn(dirent->d_name, "0123456789ABCDEF") == 24)
            continue;
 
        /*
@@ -154,10 +156,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
        }
        segno = ((uint64) log) << 32 | seg;
 
-       /* Ignore any files that are for another timeline */
-       if (tli != currenttimeline)
-           continue;
-
        /* Check if this is a completed segment or not */
        snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
        if (stat(fullpath, &statbuf) != 0)
@@ -170,9 +168,10 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
        if (statbuf.st_size == XLOG_SEG_SIZE)
        {
            /* Completed segment */
-           if (segno > high_segno)
+           if (segno > high_segno || (segno == high_segno && tli > high_tli))
            {
                high_segno = segno;
+               high_tli = tli;
                continue;
            }
        }
@@ -199,10 +198,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
 
        XLogSegNoOffsetToRecPtr(high_segno, 0, high_ptr);
 
+       *tli = high_tli;
        return high_ptr;
    }
    else
-       return currentpos;
+       return InvalidXLogRecPtr;
 }
 
 /*
@@ -212,8 +212,10 @@ static void
 StreamLog(void)
 {
    PGresult   *res;
-   uint32      timeline;
    XLogRecPtr  startpos;
+   uint32      starttli;
+   XLogRecPtr  serverpos;
+   uint32      servertli;
    uint32      hi,
                lo;
 
@@ -243,7 +245,7 @@ StreamLog(void)
                progname, PQntuples(res), PQnfields(res), 1, 3);
        disconnect_and_exit(1);
    }
-   timeline = atoi(PQgetvalue(res, 0, 1));
+   servertli = atoi(PQgetvalue(res, 0, 1));
    if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
    {
        fprintf(stderr,
@@ -251,13 +253,18 @@ StreamLog(void)
                progname, PQgetvalue(res, 0, 2));
        disconnect_and_exit(1);
    }
-   startpos = ((uint64) hi) << 32 | lo;
+   serverpos = ((uint64) hi) << 32 | lo;
    PQclear(res);
 
    /*
     * Figure out where to start streaming.
     */
-   startpos = FindStreamingStart(startpos, timeline);
+   startpos = FindStreamingStart(&starttli);
+   if (startpos == InvalidXLogRecPtr)
+   {
+       startpos = serverpos;
+       starttli = servertli;
+   }
 
    /*
     * Always start streaming at the beginning of a segment
@@ -271,10 +278,10 @@ StreamLog(void)
        fprintf(stderr,
                _("%s: starting log streaming at %X/%X (timeline %u)\n"),
                progname, (uint32) (startpos >> 32), (uint32) startpos,
-               timeline);
+               starttli);
 
-   ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
-                     stop_streaming, standby_message_timeout, false);
+   ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
+                     stop_streaming, standby_message_timeout, ".partial");
 
    PQfinish(conn);
 }
index 88d0c136b07cbe5e155ee8d402584e401c177bb3..03e275cb5b6b02482368d18daf0d23f1269dab5e 100644 (file)
 #include "streamutil.h"
 
 
-/* fd for currently open WAL file */
+/* fd and filename for currently open WAL file */
 static int walfile = -1;
+static char    current_walfile_name[MAXPGPATH] = "";
+
+static bool HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
+                char *basedir, stream_stop_callback stream_stop,
+                int standby_message_timeout, char *partial_suffix,
+                XLogRecPtr *stoppos);
 
 /*
- * Open a new WAL file in the specified directory. Store the name
- * (not including the full directory) in namebuf. Assumes there is
- * enough room in this buffer...
+ * Open a new WAL file in the specified directory.
  *
- * The file will be padded to 16Mb with zeroes.
+ * The file will be padded to 16Mb with zeroes. The base filename (without
+ * partial_suffix) is stored in current_walfile_name.
  */
-static int
+static bool
 open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
-            char *namebuf)
+            char *partial_suffix)
 {
    int         f;
    char        fn[MAXPGPATH];
@@ -50,16 +55,17 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
    XLogSegNo   segno;
 
    XLByteToSeg(startpoint, segno);
-   XLogFileName(namebuf, timeline, segno);
+   XLogFileName(current_walfile_name, timeline, segno);
 
-   snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
+   snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
+            partial_suffix ? partial_suffix : "");
    f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
    if (f == -1)
    {
        fprintf(stderr,
                _("%s: could not open transaction log file \"%s\": %s\n"),
                progname, fn, strerror(errno));
-       return -1;
+       return false;
    }
 
    /*
@@ -72,17 +78,21 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
                _("%s: could not stat transaction log file \"%s\": %s\n"),
                progname, fn, strerror(errno));
        close(f);
-       return -1;
+       return false;
    }
    if (statbuf.st_size == XLogSegSize)
-       return f;               /* File is open and ready to use */
+   {
+       /* File is open and ready to use */
+       walfile = f;
+       return true;
+   }
    if (statbuf.st_size != 0)
    {
        fprintf(stderr,
                _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
                progname, fn, (int) statbuf.st_size, XLogSegSize);
        close(f);
-       return -1;
+       return false;
    }
 
    /* New, empty, file. So pad it to 16Mb with zeroes */
@@ -97,7 +107,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
            free(zerobuf);
            close(f);
            unlink(fn);
-           return -1;
+           return false;
        }
    }
    free(zerobuf);
@@ -108,42 +118,45 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
                _("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
                progname, fn, strerror(errno));
        close(f);
-       return -1;
+       return false;
    }
-   return f;
+   walfile = f;
+   return true;
 }
 
 /*
- * Close the current WAL file, and rename it to the correct filename if it's
- * complete.
- *
- * If segment_complete is true, rename the current WAL file even if we've not
- * completed writing the whole segment.
+ * Close the current WAL file (if open), and rename it to the correct
+ * filename if it's complete. On failure, prints an error message to stderr
+ * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *walname, bool segment_complete)
+close_walfile(char *basedir, char *partial_suffix)
 {
-   off_t       currpos = lseek(walfile, 0, SEEK_CUR);
+   off_t       currpos;
+
+   if (walfile == -1)
+       return true;
 
+   currpos = lseek(walfile, 0, SEEK_CUR);
    if (currpos == -1)
    {
        fprintf(stderr,
             _("%s: could not determine seek position in file \"%s\": %s\n"),
-               progname, walname, strerror(errno));
+               progname, current_walfile_name, strerror(errno));
        return false;
    }
 
    if (fsync(walfile) != 0)
    {
        fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-               progname, walname, strerror(errno));
+               progname, current_walfile_name, strerror(errno));
        return false;
    }
 
    if (close(walfile) != 0)
    {
        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-               progname, walname, strerror(errno));
+               progname, current_walfile_name, strerror(errno));
        walfile = -1;
        return false;
    }
@@ -153,24 +166,24 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
     * Rename the .partial file only if we've completed writing the whole
     * segment or segment_complete is true.
     */
-   if (currpos == XLOG_SEG_SIZE || segment_complete)
+   if (currpos == XLOG_SEG_SIZE && partial_suffix)
    {
        char        oldfn[MAXPGPATH];
        char        newfn[MAXPGPATH];
 
-       snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
-       snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
+       snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
+       snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
        if (rename(oldfn, newfn) != 0)
        {
            fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
-                   progname, walname, strerror(errno));
+                   progname, current_walfile_name, strerror(errno));
            return false;
        }
    }
-   else
+   else if (partial_suffix)
        fprintf(stderr,
-               _("%s: not renaming \"%s\", segment is not complete\n"),
-               progname, walname);
+               _("%s: not renaming \"%s%s\", segment is not complete\n"),
+               progname, current_walfile_name, partial_suffix);
 
    return true;
 }
@@ -233,6 +246,123 @@ localTimestampDifferenceExceeds(int64 start_time,
    return (diff >= msec * INT64CONST(1000));
 }
 
+/*
+ * Check if a timeline history file exists.
+ */
+static bool
+existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
+{
+   char        path[MAXPGPATH];
+   char        histfname[MAXFNAMELEN];
+   int         fd;
+
+   /*
+    * Timeline 1 never has a history file. We treat that as if it existed,
+    * since we never need to stream it.
+    */
+   if (tli == 1)
+       return true;
+
+   TLHistoryFileName(histfname, tli);
+
+   snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+
+   fd = open(path, O_RDONLY | PG_BINARY, 0);
+   if (fd < 0)
+   {
+       if (errno != ENOENT)
+           fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s"),
+                   progname, path, strerror(errno));
+       return false;
+   }
+   else
+   {
+       close(fd);
+       return true;
+   }
+}
+
+static bool
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+{
+   int         size = strlen(content);
+   char        path[MAXPGPATH];
+   char        tmppath[MAXPGPATH];
+   char        histfname[MAXFNAMELEN];
+   int         fd;
+
+   /*
+    * Check that the server's idea of how timeline history files should be
+    * named matches ours.
+    */
+   TLHistoryFileName(histfname, tli);
+   if (strcmp(histfname, filename) != 0)
+   {
+       fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s"),
+               progname, tli, filename);
+       return false;
+   }
+
+   /*
+    * Write into a temp file name.
+    */
+   snprintf(tmppath, MAXPGPATH,  "%s.tmp", path);
+
+   unlink(tmppath);
+
+   fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+   if (fd < 0)
+   {
+       fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   errno = 0;
+   if ((int) write(fd, content, size) != size)
+   {
+       int         save_errno = errno;
+
+       /*
+        * If we fail to make the file, delete it to release disk space
+        */
+       unlink(tmppath);
+       errno = save_errno;
+
+       fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   if (fsync(fd) != 0)
+   {
+       fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   if (close(fd) != 0)
+   {
+       fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   /*
+    * Now move the completed history file into place with its final name.
+    */
+
+   snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+   if (rename(tmppath, path) < 0)
+   {
+       fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
+               progname, tmppath, path, strerror(errno));
+       return false;
+   }
+
+   return true;
+}
+
 /*
  * Converts an int64 to network byte order.
  */
@@ -314,7 +444,8 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
  * (by sending an extra IDENTIFY_SYSTEM command)
  *
  * All received segments will be written to the directory
- * specified by basedir.
+ * specified by basedir. This will also fetch any missing timeline history
+ * files.
  *
  * The stream_stop callback will be called every time data
  * is received, and whenever a segment is completed. If it returns
@@ -327,20 +458,22 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
  * This message will only contain the write location, and never
  * flush or replay.
  *
+ * If 'partial_suffix' is not NULL, files are initially created with the
+ * given suffix, and the suffix is removed once the file is finished. That
+ * allows you to tell the difference between partial and completed files,
+ * so that you can continue later where you left.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                  char *sysidentifier, char *basedir,
                  stream_stop_callback stream_stop,
-                 int standby_message_timeout, bool rename_partial)
+                 int standby_message_timeout, char *partial_suffix)
 {
    char        query[128];
-   char        current_walfile_name[MAXPGPATH];
    PGresult   *res;
-   char       *copybuf = NULL;
-   int64       last_status = -1;
-   XLogRecPtr  blockpos = InvalidXLogRecPtr;
+   XLogRecPtr  stoppos;
 
    /*
     * The message format used in streaming replication changed in 9.3, so we
@@ -359,7 +492,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 
    if (sysidentifier != NULL)
    {
-       /* Validate system identifier and timeline hasn't changed */
+       /* Validate system identifier hasn't changed */
        res = PQexec(conn, "IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
@@ -385,33 +518,184 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            PQclear(res);
            return false;
        }
-       if (timeline != atoi(PQgetvalue(res, 0, 1)))
+       if (timeline > atoi(PQgetvalue(res, 0, 1)))
        {
            fprintf(stderr,
-                   _("%s: timeline does not match between base backup and streaming connection\n"),
-                   progname);
+                   _("%s: starting timeline %u is not present in the server\n"),
+                   progname, timeline);
            PQclear(res);
            return false;
        }
        PQclear(res);
    }
 
-   /* Initiate the replication stream at specified location */
-   snprintf(query, sizeof(query), "START_REPLICATION %X/%X",
-            (uint32) (startpos >> 32), (uint32) startpos);
-   res = PQexec(conn, query);
-   if (PQresultStatus(res) != PGRES_COPY_BOTH)
+   while (1)
    {
-       fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
-               progname, "START_REPLICATION", PQresultErrorMessage(res));
+       /*
+        * Fetch the timeline history file for this timeline, if we don't
+        * have it already.
+        */
+       if (!existsTimeLineHistoryFile(basedir, timeline))
+       {
+           snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
+           res = PQexec(conn, query);
+           if (PQresultStatus(res) != PGRES_TUPLES_OK)
+           {
+               /* FIXME: we might send it ok, but get an error */
+               fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                       progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
+               PQclear(res);
+               return false;
+           }
+
+           /*
+            * The response to TIMELINE_HISTORY is a single row result set
+            * with two fields: filename and content
+            */
+           if (PQnfields(res) != 2 || PQntuples(res) != 1)
+           {
+               fprintf(stderr,
+                       _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                   progname, PQntuples(res), PQnfields(res), 1, 2);
+           }
+
+           /* Write the history file to disk */
+           writeTimeLineHistoryFile(basedir, timeline,
+                                    PQgetvalue(res, 0, 0),
+                                    PQgetvalue(res, 0, 1));
+
+           PQclear(res);
+       }
+
+       /*
+        * Before we start streaming from the requested location, check
+        * if the callback tells us to stop here.
+        */
+       if (stream_stop(startpos, timeline, false))
+           return true;
+
+       /* Initiate the replication stream at specified location */
+       snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
+                (uint32) (startpos >> 32), (uint32) startpos,
+                timeline);
+       res = PQexec(conn, query);
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
+       {
+           fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                   progname, "START_REPLICATION", PQresultErrorMessage(res));
+           PQclear(res);
+           return false;
+       }
        PQclear(res);
-       return false;
+
+       /* Stream the WAL */
+       if (!HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
+                             standby_message_timeout, partial_suffix,
+                             &stoppos))
+           goto error;
+
+       /*
+        * Streaming finished.
+        *
+        * There are two possible reasons for that: a controlled shutdown,
+        * or we reached the end of the current timeline. In case of
+        * end-of-timeline, the server sends a result set after Copy has
+        * finished, containing the next timeline's ID. Read that, and
+        * restart streaming from the next timeline.
+        */
+
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) == PGRES_TUPLES_OK)
+       {
+           /*
+            * End-of-timeline. Read the next timeline's ID.
+            */
+           uint32      newtimeline;
+
+           newtimeline = atoi(PQgetvalue(res, 0, 0));
+           PQclear(res);
+
+           if (newtimeline <= timeline)
+           {
+               /* shouldn't happen */
+               fprintf(stderr,
+                       "server reported unexpected next timeline %u, following timeline %u\n",
+                       newtimeline, timeline);
+               goto error;
+           }
+
+           /* Read the final result, which should be CommandComplete. */
+           res = PQgetResult(conn);
+           if (PQresultStatus(res) != PGRES_COMMAND_OK)
+           {
+               fprintf(stderr,
+                       _("%s: unexpected termination of replication stream: %s"),
+                       progname, PQresultErrorMessage(res));
+               goto error;
+           }
+           PQclear(res);
+
+           /*
+            * Loop back to start streaming from the new timeline.
+            * Always start streaming at the beginning of a segment.
+            */
+           timeline = newtimeline;
+           startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
+           continue;
+       }
+       else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+           /*
+            * End of replication (ie. controlled shut down of the server).
+            *
+            * Check if the callback thinks it's OK to stop here. If not,
+            * complain.
+            */
+           if (stream_stop(stoppos, timeline, false))
+               return true;
+           else
+           {
+               fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
+                       progname);
+               goto error;
+           }
+       }
+       else
+       {
+           /* Server returned an error. */
+           fprintf(stderr,
+                   _("%s: unexpected termination of replication stream: %s"),
+                   progname, PQresultErrorMessage(res));
+           goto error;
+       }
    }
-   PQclear(res);
 
-   /*
-    * Receive the actual xlog data
-    */
+error:
+   if (walfile != -1 && close(walfile) != 0)
+       fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+               progname, current_walfile_name, strerror(errno));
+   walfile = -1;
+   return false;
+}
+
+/*
+ * The main loop of ReceiveXLogStream. Handles the COPY stream after
+ * initiating streaming with the START_STREAMING command.
+ *
+ * If the COPY ends normally, returns true and sets *stoppos to the last
+ * byte written. On error, returns false.
+ */
+static bool
+HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
+                char *basedir, stream_stop_callback stream_stop,
+                int standby_message_timeout, char *partial_suffix,
+                XLogRecPtr *stoppos)
+{
+   char       *copybuf = NULL;
+   int64       last_status = -1;
+   XLogRecPtr  blockpos = startpos;
+   bool        still_sending = true;
+
    while (1)
    {
        int         r;
@@ -430,20 +714,27 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        /*
         * Check if we should continue streaming, or abort at this point.
         */
-       if (stream_stop && stream_stop(blockpos, timeline, false))
+       if (still_sending && stream_stop(blockpos, timeline, false))
        {
-           if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
-                                               rename_partial))
+           if (!close_walfile(basedir, partial_suffix))
+           {
                /* Potential error message is written by close_walfile */
                goto error;
-           return true;
+           }
+           if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+           {
+               fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                       progname, PQerrorMessage(conn));
+               goto error;
+           }
+           still_sending = false;
        }
 
        /*
         * Potentially send a status message to the master
         */
        now = localGetCurrentTimestamp();
-       if (standby_message_timeout > 0 &&
+       if (still_sending && standby_message_timeout > 0 &&
            localTimestampDifferenceExceeds(last_status, now,
                                            standby_message_timeout))
        {
@@ -457,9 +748,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        if (r == 0)
        {
            /*
-            * In async mode, and no data available. We block on reading but
-            * not more than the specified timeout, so that we can send a
-            * response back to the client.
+            * No data available. Wait for some to appear, but not longer
+            * than the specified timeout, so that we can ping the server.
             */
            fd_set      input_mask;
            struct timeval timeout;
@@ -467,7 +757,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 
            FD_ZERO(&input_mask);
            FD_SET(PQsocket(conn), &input_mask);
-           if (standby_message_timeout)
+           if (standby_message_timeout && still_sending)
            {
                int64       targettime;
                long        secs;
@@ -493,8 +783,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            {
                /*
                 * Got a timeout or signal. Continue the loop and either
-                * deliver a status packet to the server or just go back into
-                * blocking.
+                * deliver a status packet to the server or just go back
+                * into blocking.
                 */
                continue;
            }
@@ -515,8 +805,31 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            continue;
        }
        if (r == -1)
-           /* End of copy stream */
-           break;
+       {
+           /*
+            * The server closed its end of the copy stream. Close ours
+            * if we haven't done so already, and exit.
+            */
+           if (still_sending)
+           {
+               if (!close_walfile(basedir, partial_suffix))
+               {
+                   /* Error message written in close_walfile() */
+                   goto error;
+               }
+               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+               {
+                   fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                           progname, PQerrorMessage(conn));
+                   goto error;
+               }
+               still_sending = false;
+           }
+           if (copybuf != NULL)
+               PQfreemem(copybuf);
+           *stoppos = blockpos;
+           return true;
+       }
        if (r == -2)
        {
            fprintf(stderr, _("%s: could not read COPY data: %s"),
@@ -548,174 +861,148 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            replyRequested = copybuf[pos];
 
            /* If the server requested an immediate reply, send one. */
-           if (replyRequested)
+           if (replyRequested && still_sending)
            {
                now = localGetCurrentTimestamp();
                if (!sendFeedback(conn, blockpos, now, false))
                    goto error;
                last_status = now;
            }
-           continue;
        }
-       else if (copybuf[0] != 'w')
-       {
-           fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-                   progname, copybuf[0]);
-           goto error;
-       }
-
-       /*
-        * Read the header of the XLogData message, enclosed in the CopyData
-        * message. We only need the WAL location field (dataStart), the rest
-        * of the header is ignored.
-        */
-       hdr_len = 1;    /* msgtype 'w' */
-       hdr_len += 8;   /* dataStart */
-       hdr_len += 8;   /* walEnd */
-       hdr_len += 8;   /* sendTime */
-       if (r < hdr_len + 1)
+       else if (copybuf[0] == 'w')
        {
-           fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                   progname, r);
-           goto error;
-       }
-       blockpos = recvint64(&copybuf[1]);
-
-       /* Extract WAL location for this block */
-       xlogoff = blockpos % XLOG_SEG_SIZE;
+           /*
+            * Once we've decided we don't want to receive any more, just
+            * ignore any subsequent XLogData messages.
+            */
+           if (!still_sending)
+               continue;
 
-       /*
-        * Verify that the initial location in the stream matches where we
-        * think we are.
-        */
-       if (walfile == -1)
-       {
-           /* No file open yet */
-           if (xlogoff != 0)
-           {
-               fprintf(stderr,
-                       _("%s: received transaction log record for offset %u with no file open\n"),
-                       progname, xlogoff);
-               goto error;
-           }
-       }
-       else
-       {
-           /* More data in existing segment */
-           /* XXX: store seek value don't reseek all the time */
-           if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+           /*
+            * Read the header of the XLogData message, enclosed in the
+            * CopyData message. We only need the WAL location field
+            * (dataStart), the rest of the header is ignored.
+            */
+           hdr_len = 1;    /* msgtype 'w' */
+           hdr_len += 8;   /* dataStart */
+           hdr_len += 8;   /* walEnd */
+           hdr_len += 8;   /* sendTime */
+           if (r < hdr_len + 1)
            {
-               fprintf(stderr,
-                       _("%s: got WAL data offset %08x, expected %08x\n"),
-                       progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                       progname, r);
                goto error;
            }
-       }
-
-       bytes_left = r - hdr_len;
-       bytes_written = 0;
+           blockpos = recvint64(&copybuf[1]);
 
-       while (bytes_left)
-       {
-           int         bytes_to_write;
+           /* Extract WAL location for this block */
+           xlogoff = blockpos % XLOG_SEG_SIZE;
 
            /*
-            * If crossing a WAL boundary, only write up until we reach
-            * XLOG_SEG_SIZE.
+            * Verify that the initial location in the stream matches where
+            * we think we are.
             */
-           if (xlogoff + bytes_left > XLOG_SEG_SIZE)
-               bytes_to_write = XLOG_SEG_SIZE - xlogoff;
-           else
-               bytes_to_write = bytes_left;
-
            if (walfile == -1)
            {
-               walfile = open_walfile(blockpos, timeline,
-                                      basedir, current_walfile_name);
-               if (walfile == -1)
-                   /* Error logged by open_walfile */
+               /* No file open yet */
+               if (xlogoff != 0)
+               {
+                   fprintf(stderr,
+                           _("%s: received transaction log record for offset %u with no file open\n"),
+                           progname, xlogoff);
                    goto error;
+               }
            }
-
-           if (write(walfile,
-                     copybuf + hdr_len + bytes_written,
-                     bytes_to_write) != bytes_to_write)
+           else
            {
-               fprintf(stderr,
-                 _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
-                       progname, bytes_to_write, current_walfile_name,
-                       strerror(errno));
-               goto error;
+               /* More data in existing segment */
+               /* XXX: store seek value don't reseek all the time */
+               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+               {
+                   fprintf(stderr,
+                           _("%s: got WAL data offset %08x, expected %08x\n"),
+                           progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+                   goto error;
+               }
            }
 
-           /* Write was successful, advance our position */
-           bytes_written += bytes_to_write;
-           bytes_left -= bytes_to_write;
-           blockpos += bytes_to_write;
-           xlogoff += bytes_to_write;
+           bytes_left = r - hdr_len;
+           bytes_written = 0;
 
-           /* Did we reach the end of a WAL segment? */
-           if (blockpos % XLOG_SEG_SIZE == 0)
+           while (bytes_left)
            {
-               if (!close_walfile(basedir, current_walfile_name, false))
-                   /* Error message written in close_walfile() */
-                   goto error;
+               int         bytes_to_write;
 
-               xlogoff = 0;
+               /*
+                * If crossing a WAL boundary, only write up until we reach
+                * XLOG_SEG_SIZE.
+                */
+               if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+                   bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+               else
+                   bytes_to_write = bytes_left;
 
-               if (stream_stop != NULL)
+               if (walfile == -1)
                {
-                   /*
-                    * Callback when the segment finished, and return if it
-                    * told us to.
-                    */
-                   if (stream_stop(blockpos, timeline, true))
-                       return true;
+                   if (!open_walfile(blockpos, timeline,
+                                     basedir, partial_suffix))
+                   {
+                       /* Error logged by open_walfile */
+                       goto error;
+                   }
                }
-           }
-       }
-       /* No more data left to write, start receiving next copy packet */
-   }
 
-   /*
-    * The only way to get out of the loop is if the server shut down the
-    * replication stream. If it's a controlled shutdown, the server will send
-    * a shutdown message, and we'll return the latest xlog location that has
-    * been streamed.
-    */
+               if (write(walfile,
+                         copybuf + hdr_len + bytes_written,
+                         bytes_to_write) != bytes_to_write)
+               {
+                   fprintf(stderr,
+                           _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+                           progname, bytes_to_write, current_walfile_name,
+                           strerror(errno));
+                   goto error;
+               }
 
-   res = PQgetResult(conn);
-   if (PQresultStatus(res) != PGRES_COMMAND_OK)
-   {
-       fprintf(stderr,
-               _("%s: unexpected termination of replication stream: %s"),
-               progname, PQresultErrorMessage(res));
-       goto error;
-   }
-   PQclear(res);
+               /* Write was successful, advance our position */
+               bytes_written += bytes_to_write;
+               bytes_left -= bytes_to_write;
+               blockpos += bytes_to_write;
+               xlogoff += bytes_to_write;
 
-   /* Complain if we've not reached stop point yet */
-   if (stream_stop != NULL && !stream_stop(blockpos, timeline, false))
-   {
-       fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
-               progname);
-       goto error;
+               /* Did we reach the end of a WAL segment? */
+               if (blockpos % XLOG_SEG_SIZE == 0)
+               {
+                   if (!close_walfile(basedir, partial_suffix))
+                       /* Error message written in close_walfile() */
+                       goto error;
+
+                   xlogoff = 0;
+
+                   if (still_sending && stream_stop(blockpos, timeline, false))
+                   {
+                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+                       {
+                           fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                                   progname, PQerrorMessage(conn));
+                           goto error;
+                       }
+                       still_sending = false;
+                       break; /* ignore the rest of this XLogData packet */
+                   }
+               }
+           }
+           /* No more data left to write, receive next copy packet */
+       }
+       else
+       {
+           fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                   progname, copybuf[0]);
+           goto error;
+       }
    }
 
-   if (copybuf != NULL)
-       PQfreemem(copybuf);
-   if (walfile != -1 && close(walfile) != 0)
-       fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-               progname, current_walfile_name, strerror(errno));
-   walfile = -1;
-   return true;
-
 error:
    if (copybuf != NULL)
        PQfreemem(copybuf);
-   if (walfile != -1 && close(walfile) != 0)
-       fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-               progname, current_walfile_name, strerror(errno));
-   walfile = -1;
    return false;
 }
index 7176a68beaab6d9f1d39d19a7e8cd35037e80b0c..53f31a78ecab4e41600901a738d640475d7324ee 100644 (file)
@@ -13,4 +13,4 @@ extern bool ReceiveXlogStream(PGconn *conn,
                  char *basedir,
                  stream_stop_callback stream_stop,
                  int standby_message_timeout,
-                 bool rename_partial);
+                 char *partial_suffix);
index dd16f97bd79aa423d47d68399140a733cccc8487..7d45fcad8a4379f101d98cc69f8348c85936dbb2 100644 (file)
@@ -37,6 +37,7 @@ extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
 extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
 extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
-extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
+extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history,
+              TimeLineID *nextTLI);
 
 #endif   /* TIMELINE_H */
index 885b5fc0ad74396321762d53e255f0faec0897fd..72e324259645774004478bdf8759cd8236fbee6e 100644 (file)
@@ -317,8 +317,10 @@ extern void SetWalWriterSleeping(bool sleeping);
 /*
  * Starting/stopping a base backup
  */
-extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile);
-extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive);
+extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
+                  TimeLineID *starttli_p, char **labelfile);
+extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
+                 TimeLineID *stoptli_p);
 extern void do_pg_abort_backup(void);
 
 /* File path names (all relative to $PGDATA) */