Teach pg_basebackup and pg_receivexlog to reply to server keepalives.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 8 Nov 2012 08:25:58 +0000 (10:25 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 8 Nov 2012 08:28:52 +0000 (10:28 +0200)
Without this, the connection will be killed after timeout if
wal_sender_timeout is set in the server.

Original patch by Amit Kapila, modified by me to fit recent changes in the
code.

doc/src/sgml/ref/pg_basebackup.sgml
doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/receivelog.c

index a951d6b0f043961b998b1c76851b9f6f3e6e22b0..0bc3ca27b168da03794e653fa9127dfde0220807 100644 (file)
@@ -377,10 +377,10 @@ PostgreSQL documentation
       <listitem>
        <para>
         Specifies the number of seconds between status packets sent back to the
-        server. This is required when streaming the transaction log (using
-        <literal>--xlog=stream</literal>) if replication timeout is configured
-        on the server, and allows for easier monitoring. A value of zero disables
-        the status updates completely. The default value is 10 seconds.
+        server. This allows for easier monitoring of the progress from server.
+        A value of zero disables the periodic status updates completely,
+        although an update will still be sent when requested by the server, to
+        avoid timeout disconnect. The default value is 10 seconds.
        </para>
       </listitem>
      </varlistentry>
index 7f62fd9e6156133e238d9c01a7efbcba78738114..d06dd1f171c6ec3c8f2b524166b3f08ee8b0b2e2 100644 (file)
@@ -155,9 +155,10 @@ PostgreSQL documentation
       <listitem>
        <para>
         Specifies the number of seconds between status packets sent back to the
-        server. This is required if replication timeout is configured on the
-        server, and allows for easier monitoring. A value of zero disables the
-        status updates completely. The default value is 10 seconds.
+        server. This allows for easier monitoring of the progress from server.
+        A value of zero disables the periodic status updates completely,
+        although an update will still be sent when requested by the server, to
+        avoid timeout disconnect. The default value is 10 seconds.
        </para>
       </listitem>
      </varlistentry>
index f653650ffb14c2dd28a42146a571b41b728a5224..de82ff54d8e762215624c80bf59565feb60004fe 100644 (file)
@@ -287,7 +287,7 @@ recvint64(char *buf)
  * Send a Standby Status Update message to server.
  */
 static bool
-sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 {
    char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
    int         len = 0;
@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
    len += 8;
    sendint64(now, &replybuf[len]);                 /* sendTime */
    len += 8;
-   replybuf[len] = 0;                              /* replyRequested */
+   replybuf[len] = replyRequested ? 1 : 0;         /* replyRequested */
    len += 1;
 
    if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        int         bytes_left;
        int         bytes_written;
        int64       now;
+       int         hdr_len;
 
        if (copybuf != NULL)
        {
@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                            standby_message_timeout))
        {
            /* Time to send feedback! */
-           if (!sendFeedback(conn, blockpos, now))
+           if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        /* Check the message type. */
        if (copybuf[0] == 'k')
        {
+           int     pos;
+           bool    replyRequested;
+
            /*
-            * keepalive message, sent in 9.2 and newer. We just ignore this
-            * message completely, but need to skip past it in the stream.
+            * Parse the keepalive message, enclosed in the CopyData message.
+            * We just check if the server requested a reply, and ignore the
+            * rest.
             */
+           pos = 1;    /* skip msgtype 'k' */
+           pos += 8;   /* skip walEnd */
+           pos += 8;   /* skip sendTime */
+
+           if (r < pos + 1)
+           {
+               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                       progname, r);
+               goto error;
+           }
+           replyRequested = copybuf[pos];
+
+           /* If the server requested an immediate reply, send one. */
+           if (replyRequested)
+           {
+               now = localGetCurrentTimestamp();
+               if (!sendFeedback(conn, blockpos, now, false))
+                   goto error;
+               last_status = now;
+           }
            continue;
        }
        else if (copybuf[0] != 'w')
@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
         * message. We only need the WAL location field (dataStart), the rest
         * of the header is ignored.
         */
-#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */)
-       if (r < STREAMING_HEADER_SIZE + 1)
+       hdr_len = 1;    /* msgtype 'w' */
+       hdr_len += 8;   /* dataStart */
+       hdr_len += 8;   /* walEnd */
+       hdr_len += 8;   /* sendTime */
+       if (r < hdr_len + 1)
        {
            fprintf(stderr, _("%s: streaming header too small: %d\n"),
                    progname, r);
@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            }
        }
 
-       bytes_left = r - STREAMING_HEADER_SIZE;
+       bytes_left = r - hdr_len;
        bytes_written = 0;
 
        while (bytes_left)
@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            }
 
            if (write(walfile,
-                     copybuf + STREAMING_HEADER_SIZE + bytes_written,
+                     copybuf + hdr_len + bytes_written,
                      bytes_to_write) != bytes_to_write)
            {
                fprintf(stderr,