summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMagnus Hagander2012-06-10 10:12:36 +0000
committerMagnus Hagander2012-06-10 10:12:36 +0000
commita0b4c5a20a5e5076225ba440a921f4b46f1159ee (patch)
treedc8fbe6215e8710c599b75ca755db946574188b2 /src
parent7c1abc00fa1f469a2805e88abac0fa8b094b4427 (diff)
Fix pg_basebackup/pg_receivexlog for floating point timestamps
Since the replication protocol deals with TimestampTz, we need to care for the floating point case as well in the frontend tools. Fujii Masao, with changes from Magnus Hagander
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c4
-rw-r--r--src/bin/pg_basebackup/pg_receivexlog.c4
-rw-r--r--src/bin/pg_basebackup/receivelog.c59
3 files changed, 60 insertions, 7 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index d7466168d78..c3a0d89897a 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -46,7 +46,7 @@ int compresslevel = 0;
bool includewal = false;
bool streamwal = false;
bool fastcheckpoint = false;
-int standby_message_timeout = 10; /* 10 sec = default */
+int standby_message_timeout = 10 * 1000; /* 10 sec = default */
/* Progress counters */
static uint64 totalsize;
@@ -1311,7 +1311,7 @@ main(int argc, char **argv)
dbgetpassword = 1;
break;
case 's':
- standby_message_timeout = atoi(optarg);
+ standby_message_timeout = atoi(optarg) * 1000;
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 084ddc4a8cb..67a70bcf713 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -40,7 +40,7 @@
char *basedir = NULL;
int verbose = 0;
int noloop = 0;
-int standby_message_timeout = 10; /* 10 sec = default */
+int standby_message_timeout = 10 * 1000; /* 10 sec = default */
volatile bool time_to_abort = false;
@@ -356,7 +356,7 @@ main(int argc, char **argv)
dbgetpassword = 1;
break;
case 's':
- standby_message_timeout = atoi(optarg);
+ standby_message_timeout = atoi(optarg) * 1000;
if (standby_message_timeout < 0)
{
fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 66397718523..a51a40edfd1 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -23,6 +23,7 @@
#include "access/xlog_internal.h"
#include "replication/walprotocol.h"
#include "utils/datetime.h"
+#include "utils/timestamp.h"
#include "receivelog.h"
#include "streamutil.h"
@@ -196,6 +197,51 @@ localGetCurrentTimestamp(void)
}
/*
+ * Local version of TimestampDifference(), since we are not
+ * linked with backend code.
+ */
+static void
+localTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
+ long *secs, int *microsecs)
+{
+ TimestampTz diff = stop_time - start_time;
+
+ if (diff <= 0)
+ {
+ *secs = 0;
+ *microsecs = 0;
+ }
+ else
+ {
+#ifdef HAVE_INT64_TIMESTAMP
+ *secs = (long) (diff / USECS_PER_SEC);
+ *microsecs = (int) (diff % USECS_PER_SEC);
+#else
+ *secs = (long) diff;
+ *microsecs = (int) ((diff - *secs) * 1000000.0);
+#endif
+ }
+}
+
+/*
+ * Local version of TimestampDifferenceExceeds(), since we are not
+ * linked with backend code.
+ */
+static bool
+localTimestampDifferenceExceeds(TimestampTz start_time,
+ TimestampTz stop_time,
+ int msec)
+{
+ TimestampTz diff = stop_time - start_time;
+
+#ifdef HAVE_INT64_TIMESTAMP
+ return (diff >= msec * INT64CONST(1000));
+#else
+ return (diff * 1000.0 >= msec);
+#endif
+}
+
+/*
* Receive a log stream starting at the specified position.
*
* If sysidentifier is specified, validate that both the system
@@ -306,7 +352,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
*/
now = localGetCurrentTimestamp();
if (standby_message_timeout > 0 &&
- last_status < now - standby_message_timeout * 1000000)
+ localTimestampDifferenceExceeds(last_status, now,
+ standby_message_timeout))
{
/* Time to send feedback! */
char replybuf[sizeof(StandbyReplyMessage) + 1];
@@ -345,10 +392,16 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
FD_SET(PQsocket(conn), &input_mask);
if (standby_message_timeout)
{
- timeout.tv_sec = last_status + standby_message_timeout - now - 1;
+ TimestampTz targettime;
+
+ targettime = TimestampTzPlusMilliseconds(last_status,
+ standby_message_timeout - 1);
+ localTimestampDifference(now,
+ targettime,
+ &timeout.tv_sec,
+ (int *)&timeout.tv_usec);
if (timeout.tv_sec <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
- timeout.tv_usec = 0;
timeoutptr = &timeout;
}
else