Refactor pg_receivexlog main loop code, for readability, take 2.
authorFujii Masao <fujii@postgresql.org>
Wed, 6 Aug 2014 11:58:13 +0000 (20:58 +0900)
committerFujii Masao <fujii@postgresql.org>
Wed, 6 Aug 2014 11:58:13 +0000 (20:58 +0900)
Previously the source codes for processing the received data and handling
the end of stream were included in pg_receivexlog main loop. This commit
splits out them as separate functions. This is useful for improving the
readability of main loop code and making the future pg_receivexlog-related
patch simpler.

src/bin/pg_basebackup/receivelog.c

index a260881517de995190e3fccedd8d5d7f322d2770..d28e13b4d8c9f3b9433aee886e2c8596a12ffa29 100644 (file)
@@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static bool still_sending = true;      /* feedback still needs to be sent? */
+
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
                 uint32 timeline, char *basedir,
               stream_stop_callback stream_stop, int standby_message_timeout,
                 char *partial_suffix, XLogRecPtr *stoppos);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
+static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+                               XLogRecPtr blockpos, int64 *last_status);
+static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+                              XLogRecPtr *blockpos, uint32 timeline,
+                              char *basedir, stream_stop_callback stream_stop,
+                              char *partial_suffix);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+                                      XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+                                      XLogRecPtr *stoppos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                         uint32 *timeline);
@@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
    char       *copybuf = NULL;
    int64       last_status = -1;
    XLogRecPtr  blockpos = startpos;
-   bool        still_sending = true;
+
+   still_sending = true;
 
    while (1)
    {
        int         r;
-       int         xlogoff;
-       int         bytes_left;
-       int         bytes_written;
        int64       now;
-       int         hdr_len;
        long        sleeptime;
 
        /*
@@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            goto error;
        if (r == -2)
        {
-           PGresult   *res = PQgetResult(conn);
-
-           /*
-            * The server closed its end of the copy stream.  If we haven't
-            * closed ours already, we need to do so now, unless the server
-            * threw an error, in which case we don't.
-            */
-           if (still_sending)
-           {
-               if (!close_walfile(basedir, partial_suffix, blockpos))
-               {
-                   /* Error message written in close_walfile() */
-                   PQclear(res);
-                   goto error;
-               }
-               if (PQresultStatus(res) == PGRES_COPY_IN)
-               {
-                   if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-                   {
-                       fprintf(stderr,
-                               _("%s: could not send copy-end packet: %s"),
-                               progname, PQerrorMessage(conn));
-                       PQclear(res);
-                       goto error;
-                   }
-                   PQclear(res);
-                   res = PQgetResult(conn);
-               }
-               still_sending = false;
-           }
-           if (copybuf != NULL)
-               PQfreemem(copybuf);
-           copybuf = NULL;
-           *stoppos = blockpos;
-           return res;
+           PGresult    *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+                                                    basedir, partial_suffix, stoppos);
+           if (res == NULL)
+               goto error;
+           else
+               return res;
        }
 
        /* Check the message type. */
        if (copybuf[0] == 'k')
        {
-           int         pos;
-           bool        replyRequested;
-
-           /*
-            * Parse the keepalive message, enclosed in the CopyData message.
-            * We just check if the server requested a reply, and ignore the
-            * rest.
-            */
-           pos = 1;            /* skip msgtype 'k' */
-           pos += 8;           /* skip walEnd */
-           pos += 8;           /* skip sendTime */
-
-           if (r < pos + 1)
-           {
-               fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                       progname, r);
+           if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+                                    &last_status))
                goto error;
-           }
-           replyRequested = copybuf[pos];
-
-           /* If the server requested an immediate reply, send one. */
-           if (replyRequested && still_sending)
-           {
-               now = feGetCurrentTimestamp();
-               if (!sendFeedback(conn, blockpos, now, false))
-                   goto error;
-               last_status = now;
-           }
        }
        else if (copybuf[0] == 'w')
        {
-           /*
-            * Once we've decided we don't want to receive any more, just
-            * ignore any subsequent XLogData messages.
-            */
-           if (!still_sending)
-               continue;
-
-           /*
-            * Read the header of the XLogData message, enclosed in the
-            * CopyData message. We only need the WAL location field
-            * (dataStart), the rest of the header is ignored.
-            */
-           hdr_len = 1;        /* msgtype 'w' */
-           hdr_len += 8;       /* dataStart */
-           hdr_len += 8;       /* walEnd */
-           hdr_len += 8;       /* sendTime */
-           if (r < hdr_len)
-           {
-               fprintf(stderr, _("%s: streaming header too small: %d\n"),
-                       progname, r);
+           if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+                                   timeline, basedir, stream_stop, partial_suffix))
                goto error;
-           }
-           blockpos = fe_recvint64(&copybuf[1]);
-
-           /* Extract WAL location for this block */
-           xlogoff = blockpos % XLOG_SEG_SIZE;
-
-           /*
-            * Verify that the initial location in the stream matches where we
-            * think we are.
-            */
-           if (walfile == -1)
-           {
-               /* No file open yet */
-               if (xlogoff != 0)
-               {
-                   fprintf(stderr,
-                           _("%s: received transaction log record for offset %u with no file open\n"),
-                           progname, xlogoff);
-                   goto error;
-               }
-           }
-           else
-           {
-               /* More data in existing segment */
-               /* XXX: store seek value don't reseek all the time */
-               if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
-               {
-                   fprintf(stderr,
-                         _("%s: got WAL data offset %08x, expected %08x\n"),
-                      progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
-                   goto error;
-               }
-           }
-
-           bytes_left = r - hdr_len;
-           bytes_written = 0;
-
-           while (bytes_left)
-           {
-               int         bytes_to_write;
-
-               /*
-                * If crossing a WAL boundary, only write up until we reach
-                * XLOG_SEG_SIZE.
-                */
-               if (xlogoff + bytes_left > XLOG_SEG_SIZE)
-                   bytes_to_write = XLOG_SEG_SIZE - xlogoff;
-               else
-                   bytes_to_write = bytes_left;
-
-               if (walfile == -1)
-               {
-                   if (!open_walfile(blockpos, timeline,
-                                     basedir, partial_suffix))
-                   {
-                       /* Error logged by open_walfile */
-                       goto error;
-                   }
-               }
-
-               if (write(walfile,
-                         copybuf + hdr_len + bytes_written,
-                         bytes_to_write) != bytes_to_write)
-               {
-                   fprintf(stderr,
-                           _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
-                           progname, bytes_to_write, current_walfile_name,
-                           strerror(errno));
-                   goto error;
-               }
-
-               /* Write was successful, advance our position */
-               bytes_written += bytes_to_write;
-               bytes_left -= bytes_to_write;
-               blockpos += bytes_to_write;
-               xlogoff += bytes_to_write;
-
-               /* Did we reach the end of a WAL segment? */
-               if (blockpos % XLOG_SEG_SIZE == 0)
-               {
-                   if (!close_walfile(basedir, partial_suffix, blockpos))
-                       /* Error message written in close_walfile() */
-                       goto error;
-
-                   xlogoff = 0;
-
-                   if (still_sending && stream_stop(blockpos, timeline, true))
-                   {
-                       if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-                       {
-                           fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-                                   progname, PQerrorMessage(conn));
-                           goto error;
-                       }
-                       still_sending = false;
-                       break;  /* ignore the rest of this XLogData packet */
-                   }
-               }
-           }
-           /* No more data left to write, receive next copy packet */
        }
        else
        {
@@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
    *buffer = copybuf;
    return rawlen;
 }
+
+/*
+ * Process the keepalive message.
+ */
+static bool
+ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+                   XLogRecPtr blockpos, int64 *last_status)
+{
+   int         pos;
+   bool        replyRequested;
+   int64       now;
+
+   /*
+    * Parse the keepalive message, enclosed in the CopyData message.
+    * We just check if the server requested a reply, and ignore the
+    * rest.
+    */
+   pos = 1;            /* skip msgtype 'k' */
+   pos += 8;           /* skip walEnd */
+   pos += 8;           /* skip sendTime */
+
+   if (len < pos + 1)
+   {
+       fprintf(stderr, _("%s: streaming header too small: %d\n"),
+               progname, len);
+       return false;
+   }
+   replyRequested = copybuf[pos];
+
+   /* If the server requested an immediate reply, send one. */
+   if (replyRequested && still_sending)
+   {
+       now = feGetCurrentTimestamp();
+       if (!sendFeedback(conn, blockpos, now, false))
+           return false;
+       *last_status = now;
+   }
+
+   return true;
+}
+
+/*
+ * Process XLogData message.
+ */
+static bool
+ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
+                  XLogRecPtr *blockpos, uint32 timeline,
+                  char *basedir, stream_stop_callback stream_stop,
+                  char *partial_suffix)
+{
+   int         xlogoff;
+   int         bytes_left;
+   int         bytes_written;
+   int         hdr_len;
+
+   /*
+    * Once we've decided we don't want to receive any more, just
+    * ignore any subsequent XLogData messages.
+    */
+   if (!(still_sending))
+       return true;
+
+   /*
+    * Read the header of the XLogData message, enclosed in the
+    * CopyData message. We only need the WAL location field
+    * (dataStart), the rest of the header is ignored.
+    */
+   hdr_len = 1;        /* msgtype 'w' */
+   hdr_len += 8;       /* dataStart */
+   hdr_len += 8;       /* walEnd */
+   hdr_len += 8;       /* sendTime */
+   if (len < hdr_len)
+   {
+       fprintf(stderr, _("%s: streaming header too small: %d\n"),
+               progname, len);
+       return false;
+   }
+   *blockpos = fe_recvint64(&copybuf[1]);
+
+   /* Extract WAL location for this block */
+   xlogoff = *blockpos % XLOG_SEG_SIZE;
+
+   /*
+    * Verify that the initial location in the stream matches where we
+    * think we are.
+    */
+   if (walfile == -1)
+   {
+       /* No file open yet */
+       if (xlogoff != 0)
+       {
+           fprintf(stderr,
+                   _("%s: received transaction log record for offset %u with no file open\n"),
+                   progname, xlogoff);
+           return false;
+       }
+   }
+   else
+   {
+       /* More data in existing segment */
+       /* XXX: store seek value don't reseek all the time */
+       if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+       {
+           fprintf(stderr,
+                   _("%s: got WAL data offset %08x, expected %08x\n"),
+                   progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+           return false;
+       }
+   }
+
+   bytes_left = len - hdr_len;
+   bytes_written = 0;
+
+   while (bytes_left)
+   {
+       int         bytes_to_write;
+
+       /*
+        * If crossing a WAL boundary, only write up until we reach
+        * XLOG_SEG_SIZE.
+        */
+       if (xlogoff + bytes_left > XLOG_SEG_SIZE)
+           bytes_to_write = XLOG_SEG_SIZE - xlogoff;
+       else
+           bytes_to_write = bytes_left;
+
+       if (walfile == -1)
+       {
+           if (!open_walfile(*blockpos, timeline,
+                             basedir, partial_suffix))
+           {
+               /* Error logged by open_walfile */
+               return false;
+           }
+       }
+
+       if (write(walfile,
+                 copybuf + hdr_len + bytes_written,
+                 bytes_to_write) != bytes_to_write)
+       {
+           fprintf(stderr,
+                   _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
+                   progname, bytes_to_write, current_walfile_name,
+                   strerror(errno));
+           return false;
+       }
+
+       /* Write was successful, advance our position */
+       bytes_written += bytes_to_write;
+       bytes_left -= bytes_to_write;
+       *blockpos += bytes_to_write;
+       xlogoff += bytes_to_write;
+
+       /* Did we reach the end of a WAL segment? */
+       if (*blockpos % XLOG_SEG_SIZE == 0)
+       {
+           if (!close_walfile(basedir, partial_suffix, *blockpos))
+               /* Error message written in close_walfile() */
+               return false;
+
+           xlogoff = 0;
+
+           if (still_sending && stream_stop(*blockpos, timeline, true))
+           {
+               if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+               {
+                   fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+                           progname, PQerrorMessage(conn));
+                   return false;
+               }
+               still_sending = false;
+               return true;    /* ignore the rest of this XLogData packet */
+           }
+       }
+   }
+   /* No more data left to write, receive next copy packet */
+
+   return true;
+}
+
+/*
+ * Handle end of the copy stream.
+ */
+static PGresult *
+HandleEndOfCopyStream(PGconn *conn, char *copybuf,
+                     XLogRecPtr blockpos, char *basedir, char *partial_suffix,
+                     XLogRecPtr *stoppos)
+{
+   PGresult   *res = PQgetResult(conn);
+
+   /*
+    * The server closed its end of the copy stream.  If we haven't
+    * closed ours already, we need to do so now, unless the server
+    * threw an error, in which case we don't.
+    */
+   if (still_sending)
+   {
+       if (!close_walfile(basedir, partial_suffix, blockpos))
+       {
+           /* Error message written in close_walfile() */
+           PQclear(res);
+           return NULL;
+       }
+       if (PQresultStatus(res) == PGRES_COPY_IN)
+       {
+           if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+           {
+               fprintf(stderr,
+                       _("%s: could not send copy-end packet: %s"),
+                       progname, PQerrorMessage(conn));
+               PQclear(res);
+               return NULL;
+           }
+           res = PQgetResult(conn);
+       }
+       still_sending = false;
+   }
+   if (copybuf != NULL)
+       PQfreemem(copybuf);
+   *stoppos = blockpos;
+   return res;
+}