Fix pg_basebackup for keepalive messages
authorMagnus Hagander <magnus@hagander.net>
Mon, 9 Jan 2012 10:53:38 +0000 (11:53 +0100)
committerMagnus Hagander <magnus@hagander.net>
Mon, 9 Jan 2012 17:07:19 +0000 (18:07 +0100)
Teach pg_basebackup in streaming mode to deal with keepalive messages.
Also change the order of checks to complain at the message rather than
block size when a new message is introduced.

In passing, switch to using sizeof() instead of hardcoded sizes for
WAL protocol structs.

src/bin/pg_basebackup/receivelog.c

index c18db4fd042cc7096acabc357419b95d035f3f57..e3a0e92d363930017034c8d7744bb6ab02df3fc0 100644 (file)
@@ -33,8 +33,9 @@
 #include <unistd.h>
 
 
-/* Size of the streaming replication protocol header */
-#define STREAMING_HEADER_SIZE (1+8+8+8)
+/* Size of the streaming replication protocol headers */
+#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
+#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
 
 const XLogRecPtr InvalidXLogRecPtr = {0, 0};
 
@@ -374,18 +375,33 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
                    progname, PQerrorMessage(conn));
            return false;
        }
-       if (r < STREAMING_HEADER_SIZE + 1)
+       if (copybuf[0] == 'k')
        {
-           fprintf(stderr, _("%s: streaming header too small: %i\n"),
-                   progname, r);
-           return false;
+           /*
+            * keepalive message, sent in 9.2 and newer. We just ignore
+            * this message completely, but need to forward past it
+            * in our reading.
+            */
+           if (r != STREAMING_KEEPALIVE_SIZE)
+           {
+               fprintf(stderr, _("%s: keepalive message is incorrect size: %i\n"),
+                       progname, r);
+               return false;
+           }
+           continue;
        }
-       if (copybuf[0] != 'w')
+       else if (copybuf[0] != 'w')
        {
            fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
                    progname, copybuf[0]);
            return false;
        }
+       if (r < STREAMING_HEADER_SIZE + 1)
+       {
+           fprintf(stderr, _("%s: streaming header too small: %i\n"),
+                   progname, r);
+           return false;
+       }
 
        /* Extract WAL location for this block */
        memcpy(&blockpos, copybuf + 1, 8);