static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void);
-static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
#ifdef HAVE_LIBZ
static const char *
/*
- * Called in the background process whenever a complete segment of WAL
- * has been received.
+ * Called in the background process every time data is received.
* On Unix, we check to see if there is any data on our pipe
* (which would mean we have a stop position), and if it is, check if
* it is time to stop.
* time to stop.
*/
static bool
-segment_callback(XLogRecPtr segendpos, uint32 timeline)
+reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
{
if (!has_xlogendptr)
{
{
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
- segment_callback, NULL, standby_message_timeout))
+ reached_end_position, standby_message_timeout, true))
/*
* Any errors will already have been reported in the function process,
static void usage(void);
static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
static void StreamLog();
-static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
static void
usage(void)
}
static bool
-segment_callback(XLogRecPtr segendpos, uint32 timeline)
+stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
{
- if (verbose)
+ if (verbose && segment_finished)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, segendpos.xlogid, segendpos.xrecoff, timeline);
- /*
- * Never abort from this - we handle all aborting in continue_streaming()
- */
- return false;
-}
-
-static bool
-continue_streaming(void)
-{
if (time_to_abort)
{
fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
progname, startpos.xlogid, startpos.xrecoff, timeline);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
- segment_callback, continue_streaming,
- standby_message_timeout);
+ stop_streaming,
+ standby_message_timeout, false);
PQfinish(conn);
}
return f;
}
+/*
+ * Close the current WAL file, and rename it to the correct filename if it's complete.
+ *
+ * If segment_complete is true, rename the current WAL file even if we've not
+ * completed writing the whole segment.
+ */
static bool
-close_walfile(int walfile, char *basedir, char *walname)
+close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
{
off_t currpos = lseek(walfile, 0, SEEK_CUR);
/*
* Rename the .partial file only if we've completed writing the
- * whole segment.
+ * whole segment or segment_complete is true.
*/
- if (currpos == XLOG_SEG_SIZE)
+ if (currpos == XLOG_SEG_SIZE || segment_complete)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
* All received segments will be written to the directory
* specified by basedir.
*
- * The segment_finish callback will be called after each segment
- * has been finished, and the stream_continue callback will be
- * called every time data is received. If either of these callbacks
- * return true, the streaming will stop and the function
- * return. As long as they return false, streaming will continue
+ * The stream_stop callback will be called every time data
+ * is received, and whenever a segment is completed. If it returns
+ * true, the streaming will stop and the function
+ * return. As long as it returns false, streaming will continue
* indefinitely.
*
* standby_message_timeout controls how often we send a message
* Note: The log position *must* be at a log segment start!
*/
bool
-ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout)
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial)
{
char query[128];
char current_walfile_name[MAXPGPATH];
/*
* Check if we should continue streaming, or abort at this point.
*/
- if (stream_continue && stream_continue())
+ if (stream_stop && stream_stop(blockpos, timeline, false))
{
if (walfile != -1)
/* Potential error message is written by close_walfile */
- return close_walfile(walfile, basedir, current_walfile_name);
+ return close_walfile(walfile, basedir, current_walfile_name, rename_partial);
return true;
}
/* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(walfile, basedir, current_walfile_name))
+ if (!close_walfile(walfile, basedir, current_walfile_name, false))
/* Error message written in close_walfile() */
return false;
walfile = -1;
xlogoff = 0;
- if (segment_finish != NULL)
+ if (stream_stop != NULL)
{
/*
* Callback when the segment finished, and return if it
* told us to.
*/
- if (segment_finish(blockpos, timeline))
+ if (stream_stop(blockpos, timeline, true))
return true;
}
}
#include "access/xlogdefs.h"
/*
- * Called whenever a segment is finished, return true to stop
- * the streaming at this point.
+ * Called before trying to read more data or when a segment is
+ * finished. Return true to stop streaming.
*/
-typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
-
-/*
- * Called before trying to read more data. Return true to stop
- * the streaming at this point.
- */
-typedef bool (*stream_continue_callback)(void);
+typedef bool (*stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
extern bool ReceiveXlogStream(PGconn *conn,
XLogRecPtr startpos,
uint32 timeline,
char *sysidentifier,
char *basedir,
- segment_finish_callback segment_finish,
- stream_continue_callback stream_continue,
- int standby_message_timeout);
+ stream_stop_callback stream_stop,
+ int standby_message_timeout,
+ bool rename_partial);