Teach pg_basebackup and pg_receivexlog to reply to server keepalives.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 8 Nov 2012 08:25:58 +0000 (10:25 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 8 Nov 2012 08:28:52 +0000 (10:28 +0200)
Without this, the connection will be killed after timeout if
wal_sender_timeout is set in the server.

Original patch by Amit Kapila, modified by me to fit recent changes in the
code.

doc/src/sgml/ref/pg_basebackup.sgml
doc/src/sgml/ref/pg_receivexlog.sgml
src/bin/pg_basebackup/receivelog.c

index a951d6b0f043961b998b1c76851b9f6f3e6e22b0..0bc3ca27b168da03794e653fa9127dfde0220807 100644 (file)
@@ -377,10 +377,10 @@ PostgreSQL documentation
       <listitem>
        <para>
         Specifies the number of seconds between status packets sent back to the
-        server. This is required when streaming the transaction log (using
-        <literal>--xlog=stream</literal>) if replication timeout is configured
-        on the server, and allows for easier monitoring. A value of zero disables
-        the status updates completely. The default value is 10 seconds.
+        server. This allows for easier monitoring of the progress from server.
+        A value of zero disables the periodic status updates completely,
+        although an update will still be sent when requested by the server, to
+        avoid timeout disconnect. The default value is 10 seconds.
        </para>
       </listitem>
      </varlistentry>
index 7f62fd9e6156133e238d9c01a7efbcba78738114..d06dd1f171c6ec3c8f2b524166b3f08ee8b0b2e2 100644 (file)
@@ -155,9 +155,10 @@ PostgreSQL documentation
       <listitem>
        <para>
         Specifies the number of seconds between status packets sent back to the
-        server. This is required if replication timeout is configured on the
-        server, and allows for easier monitoring. A value of zero disables the
-        status updates completely. The default value is 10 seconds.
+        server. This allows for easier monitoring of the progress from server.
+        A value of zero disables the periodic status updates completely,
+        although an update will still be sent when requested by the server, to
+        avoid timeout disconnect. The default value is 10 seconds.
        </para>
       </listitem>
      </varlistentry>
index f653650ffb14c2dd28a42146a571b41b728a5224..de82ff54d8e762215624c80bf59565feb60004fe 100644 (file)
@@ -287,7 +287,7 @@ recvint64(char *buf)
  * Send a Standby Status Update message to server.
  */
 static bool
-sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 {
        char            replybuf[1 + 8 + 8 + 8 + 8 + 1];
        int             len = 0;
@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
        len += 8;
        sendint64(now, &replybuf[len]);                                 /* sendTime */
        len += 8;
-       replybuf[len] = 0;                                                              /* replyRequested */
+       replybuf[len] = replyRequested ? 1 : 0;                 /* replyRequested */
        len += 1;
 
        if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                int                     bytes_left;
                int                     bytes_written;
                int64           now;
+               int                     hdr_len;
 
                if (copybuf != NULL)
                {
@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                                                                                        standby_message_timeout))
                {
                        /* Time to send feedback! */
-                       if (!sendFeedback(conn, blockpos, now))
+                       if (!sendFeedback(conn, blockpos, now, false))
                                goto error;
                        last_status = now;
                }
@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                /* Check the message type. */
                if (copybuf[0] == 'k')
                {
+                       int             pos;
+                       bool    replyRequested;
+
                        /*
-                        * keepalive message, sent in 9.2 and newer. We just ignore this
-                        * message completely, but need to skip past it in the stream.
+                        * Parse the keepalive message, enclosed in the CopyData message.
+                        * We just check if the server requested a reply, and ignore the
+                        * rest.
                         */
+                       pos = 1;        /* skip msgtype 'k' */
+                       pos += 8;       /* skip walEnd */
+                       pos += 8;       /* skip sendTime */
+
+                       if (r < pos + 1)
+                       {
+                               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                                               progname, r);
+                               goto error;
+                       }
+                       replyRequested = copybuf[pos];
+
+                       /* If the server requested an immediate reply, send one. */
+                       if (replyRequested)
+                       {
+                               now = localGetCurrentTimestamp();
+                               if (!sendFeedback(conn, blockpos, now, false))
+                                       goto error;
+                               last_status = now;
+                       }
                        continue;
                }
                else if (copybuf[0] != 'w')
@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 * message. We only need the WAL location field (dataStart), the rest
                 * of the header is ignored.
                 */
-#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */)
-               if (r < STREAMING_HEADER_SIZE + 1)
+               hdr_len = 1;    /* msgtype 'w' */
+               hdr_len += 8;   /* dataStart */
+               hdr_len += 8;   /* walEnd */
+               hdr_len += 8;   /* sendTime */
+               if (r < hdr_len + 1)
                {
                        fprintf(stderr, _("%s: streaming header too small: %d\n"),
                                        progname, r);
@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        }
                }
 
-               bytes_left = r - STREAMING_HEADER_SIZE;
+               bytes_left = r - hdr_len;
                bytes_written = 0;
 
                while (bytes_left)
@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        }
 
                        if (write(walfile,
-                                         copybuf + STREAMING_HEADER_SIZE + bytes_written,
+                                         copybuf + hdr_len + bytes_written,
                                          bytes_to_write) != bytes_to_write)
                        {
                                fprintf(stderr,