Fix base backup streaming xlog from standby
authorMagnus Hagander <magnus@hagander.net>
Fri, 25 May 2012 09:36:22 +0000 (11:36 +0200)
committerMagnus Hagander <magnus@hagander.net>
Fri, 25 May 2012 09:36:22 +0000 (11:36 +0200)
When backing up from a standby server, the backup process
will not automatically switch xlog segment. So we must
accept a partially transferred xlog file in this case, but
rename it into position anyway.

In passing, merge the two callbacks for segment end and
stop stream into a single callback, since their implementations
were close to identical, and rename this callback to
reflect that it stops streaming rather than continues it.

Patch by Magnus Hagander, review by Fujii Masao

src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index 0289c4bc4f86dba89b08b1daa81ca5e3f4873d1b..6a2e557809a49161f091b16d8a83386849b5630d 100644 (file)
@@ -78,7 +78,7 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup(void);
 
-static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
 
 #ifdef HAVE_LIBZ
 static const char *
@@ -129,8 +129,7 @@ usage(void)
 
 
 /*
- * Called in the background process whenever a complete segment of WAL
- * has been received.
+ * Called in the background process every time data is received.
  * On Unix, we check to see if there is any data on our pipe
  * (which would mean we have a stop position), and if it is, check if
  * it is time to stop.
@@ -138,7 +137,7 @@ usage(void)
  * time to stop.
  */
 static bool
-segment_callback(XLogRecPtr segendpos, uint32 timeline)
+reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 {
    if (!has_xlogendptr)
    {
@@ -231,7 +230,7 @@ LogStreamerMain(logstreamer_param * param)
 {
    if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                           param->sysidentifier, param->xlogdir,
-                          segment_callback, NULL, standby_message_timeout))
+                          reached_end_position, standby_message_timeout, true))
 
        /*
         * Any errors will already have been reported in the function process,
index 2134c8729cc17aeb67abdd2eb260a0e37a02df06..01f20f372a4de32d9329ebaa249012f559581fd6 100644 (file)
@@ -43,7 +43,7 @@ volatile bool time_to_abort = false;
 static void usage(void);
 static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
 static void StreamLog();
-static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
 
 static void
 usage(void)
@@ -69,21 +69,12 @@ usage(void)
 }
 
 static bool
-segment_callback(XLogRecPtr segendpos, uint32 timeline)
+stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 {
-   if (verbose)
+   if (verbose && segment_finished)
        fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
                progname, segendpos.xlogid, segendpos.xrecoff, timeline);
 
-   /*
-    * Never abort from this - we handle all aborting in continue_streaming()
-    */
-   return false;
-}
-
-static bool
-continue_streaming(void)
-{
    if (time_to_abort)
    {
        fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
@@ -268,8 +259,8 @@ StreamLog(void)
                progname, startpos.xlogid, startpos.xrecoff, timeline);
 
    ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
-                     segment_callback, continue_streaming,
-                     standby_message_timeout);
+                     stop_streaming,
+                     standby_message_timeout, false);
 
    PQfinish(conn);
 }
index b0cf836968dcf63745d3c8979dfb0c4f0d4be2cb..efbc4ca653394a66c2ef4d06d0fe9529210fd8f9 100644 (file)
@@ -113,8 +113,14 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
    return f;
 }
 
+/*
+ * Close the current WAL file, and rename it to the correct filename if it's complete.
+ *
+ * If segment_complete is true, rename the current WAL file even if we've not
+ * completed writing the whole segment.
+ */
 static bool
-close_walfile(int walfile, char *basedir, char *walname)
+close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
 {
    off_t       currpos = lseek(walfile, 0, SEEK_CUR);
 
@@ -141,9 +147,9 @@ close_walfile(int walfile, char *basedir, char *walname)
 
    /*
     * Rename the .partial file only if we've completed writing the
-    * whole segment.
+    * whole segment or segment_complete is true.
     */
-   if (currpos == XLOG_SEG_SIZE)
+   if (currpos == XLOG_SEG_SIZE || segment_complete)
    {
        char        oldfn[MAXPGPATH];
        char        newfn[MAXPGPATH];
@@ -199,11 +205,10 @@ localGetCurrentTimestamp(void)
  * All received segments will be written to the directory
  * specified by basedir.
  *
- * The segment_finish callback will be called after each segment
- * has been finished, and the stream_continue callback will be
- * called every time data is received. If either of these callbacks
- * return true, the streaming will stop and the function
- * return. As long as they return false, streaming will continue
+ * The stream_stop callback will be called every time data
+ * is received, and whenever a segment is completed. If it returns
+ * true, the streaming will stop and the function
+ * return. As long as it returns false, streaming will continue
  * indefinitely.
  *
  * standby_message_timeout controls how often we send a message
@@ -214,7 +219,7 @@ localGetCurrentTimestamp(void)
  * Note: The log position *must* be at a log segment start!
  */
 bool
-ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial)
 {
    char        query[128];
    char        current_walfile_name[MAXPGPATH];
@@ -288,11 +293,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
        /*
         * Check if we should continue streaming, or abort at this point.
         */
-       if (stream_continue && stream_continue())
+       if (stream_stop && stream_stop(blockpos, timeline, false))
        {
            if (walfile != -1)
                /* Potential error message is written by close_walfile */
-               return close_walfile(walfile, basedir, current_walfile_name);
+               return close_walfile(walfile, basedir, current_walfile_name, rename_partial);
            return true;
        }
 
@@ -486,20 +491,20 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
            /* Did we reach the end of a WAL segment? */
            if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
            {
-               if (!close_walfile(walfile, basedir, current_walfile_name))
+               if (!close_walfile(walfile, basedir, current_walfile_name, false))
                    /* Error message written in close_walfile() */
                    return false;
 
                walfile = -1;
                xlogoff = 0;
 
-               if (segment_finish != NULL)
+               if (stream_stop != NULL)
                {
                    /*
                     * Callback when the segment finished, and return if it
                     * told us to.
                     */
-                   if (segment_finish(blockpos, timeline))
+                   if (stream_stop(blockpos, timeline, true))
                        return true;
                }
            }
index 1c61ea8ac1d7b54ebebd4474e1f2e4e316d9c9dc..0a803ee4ac1381acddfe248e4f64ddbea18db5fa 100644 (file)
@@ -1,22 +1,16 @@
 #include "access/xlogdefs.h"
 
 /*
- * Called whenever a segment is finished, return true to stop
- * the streaming at this point.
+ * Called before trying to read more data or when a segment is
+ * finished. Return true to stop streaming.
  */
-typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
-
-/*
- * Called before trying to read more data. Return true to stop
- * the streaming at this point.
- */
-typedef bool (*stream_continue_callback)(void);
+typedef bool (*stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
 
 extern bool ReceiveXlogStream(PGconn *conn,
                              XLogRecPtr startpos,
                              uint32 timeline,
                              char *sysidentifier,
                              char *basedir,
-                             segment_finish_callback segment_finish,
-                             stream_continue_callback stream_continue,
-                             int standby_message_timeout);
+                             stream_stop_callback stream_stop,
+                             int standby_message_timeout,
+                             bool rename_partial);