Add pg_recvlogical, a tool to receive data logical decoding data.
authorRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 16:19:57 +0000 (12:19 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 18 Mar 2014 16:25:14 +0000 (12:25 -0400)
This is fairly basic at the moment, but it's at least useful for
testing and debugging, and possibly more.

Andres Freund

src/bin/pg_basebackup/.gitignore
src/bin/pg_basebackup/Makefile
src/bin/pg_basebackup/nls.mk
src/bin/pg_basebackup/pg_recvlogical.c [new file with mode: 0644]
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h
src/bin/pg_basebackup/streamutil.c
src/bin/pg_basebackup/streamutil.h

index 1334a1f77b1a606df4f7d0a781d27602207372e6..7abea15a3f9a08908c7308dc5a6fa7aaed18272a 100644 (file)
@@ -1,2 +1,3 @@
 /pg_basebackup
 /pg_receivexlog
+/pg_recvlogical
index 17c91af124036b1d0ebab1453c1867825bf05909..346560eeab1cca5f2e66b2921e952013393c6d5f 100644 (file)
@@ -20,7 +20,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
 OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup pg_receivexlog
+all: pg_basebackup pg_receivexlog pg_recvlogical
 
 pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
    $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
@@ -28,9 +28,13 @@ pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
 pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
    $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport
+   $(CC) $(CFLAGS) pg_recvlogical.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
 install: all installdirs
    $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
    $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+   $(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
 installdirs:
    $(MKDIR_P) '$(DESTDIR)$(bindir)'
@@ -38,6 +42,9 @@ installdirs:
 uninstall:
    rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
    rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
+   rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
 
 clean distclean maintainer-clean:
-   rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
+   rm -f pg_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) \
+       pg_basebackup.o pg_receivexlog.o pg_recvlogical.o \
+       $(OBJS)
index e1c96dd4c49937e36c65a742e2ed78666fec2b87..29df4bcdb39698773aa9db750cdf13eab564fe4c 100644 (file)
@@ -1,4 +1,4 @@
 # src/bin/pg_basebackup/nls.mk
 CATALOG_NAME     = pg_basebackup
 AVAIL_LANGUAGES  = cs de es fr it ja pl pt_BR ru zh_CN
-GETTEXT_FILES    = pg_basebackup.c pg_receivexlog.c receivelog.c streamutil.c ../../common/fe_memutils.c
+GETTEXT_FILES    = pg_basebackup.c pg_receivexlog.c pg_recvlogical.c receivelog.c streamutil.c ../../common/fe_memutils.c
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
new file mode 100644 (file)
index 0000000..a631cee
--- /dev/null
@@ -0,0 +1,978 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_recvlogical.c - receive data from a logical decoding slot in a streaming fashion
+ *                   and write it to to a local file.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/bin/pg_basebackup/pg_recvlogical.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+/* local includes */
+#include "streamutil.h"
+
+#include "access/xlog_internal.h"
+#include "common/fe_memutils.h"
+#include "getopt_long.h"
+#include "libpq-fe.h"
+#include "libpq/pqsignal.h"
+#include "pqexpbuffer.h"
+
+
+/* Time to sleep between reconnection attempts */
+#define RECONNECT_SLEEP_TIME 5
+
+/* Global Options */
+static char    *outfile = NULL;
+static int     verbose = 0;
+static int     noloop = 0;
+static int     standby_message_timeout = 10 * 1000;        /* 10 sec = default */
+static int     fsync_interval = 10 * 1000;     /* 10 sec = default */
+static XLogRecPtr startpos = InvalidXLogRecPtr;
+static bool        do_create_slot = false;
+static bool        do_start_slot = false;
+static bool        do_drop_slot = false;
+
+/* filled pairwise with option, value. value may be NULL */
+static char      **options;
+static size_t  noptions = 0;
+static const char *plugin = "test_decoding";
+
+/* Global State */
+static int     outfd = -1;
+static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t output_reopen = false;
+static int64   output_last_fsync = -1;
+static bool        output_unsynced = false;
+static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
+static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
+
+static void usage(void);
+static void StreamLog();
+static void disconnect_and_exit(int code);
+
+static void
+usage(void)
+{
+   printf(_("%s receives PostgreSQL logical change stream.\n\n"),
+          progname);
+   printf(_("Usage:\n"));
+   printf(_("  %s [OPTION]...\n"), progname);
+   printf(_("\nOptions:\n"));
+   printf(_("  -f, --file=FILE        receive log into this file. - for stdout\n"));
+   printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+   printf(_("  -v, --verbose          output verbose messages\n"));
+   printf(_("  -V, --version          output version information, then exit\n"));
+   printf(_("  -?, --help             show this help, then exit\n"));
+   printf(_("\nConnection options:\n"));
+   printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
+   printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
+   printf(_("  -p, --port=PORT        database server port number\n"));
+   printf(_("  -U, --username=NAME    connect as specified database user\n"));
+   printf(_("  -w, --no-password      never prompt for password\n"));
+   printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
+   printf(_("\nReplication options:\n"));
+   printf(_("  -F  --fsync-interval=INTERVAL\n"
+            "                         frequency of syncs to the output file (in seconds, defaults to 10)\n"));
+   printf(_("  -o, --option=NAME[=VALUE]\n"
+            "                         Specify option NAME with optional value VAL, to be passed\n"
+            "                         to the output plugin\n"));
+   printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (defaults to test_decoding)\n"));
+   printf(_("  -s, --status-interval=INTERVAL\n"
+            "                         time between status packets sent to server (in seconds, defaults to 10)\n"));
+   printf(_("  -S, --slot=SLOT        use existing replication slot SLOT instead of starting a new one\n"));
+   printf(_("  -I, --startpos=PTR     Where in an existing slot should the streaming start\n"));
+   printf(_("\nAction to be performed:\n"));
+   printf(_("      --create           create a new replication slot (for the slotname see --slot)\n"));
+   printf(_("      --start            start streaming in a replication slot (for the slotname see --slot)\n"));
+   printf(_("      --drop             drop the replication slot (for the slotname see --slot)\n"));
+   printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ */
+static bool
+sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
+{
+   static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
+   static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
+
+   char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
+   int         len = 0;
+
+   /*
+    * we normally don't want to send superflous feedbacks, but if it's
+    * because of a timeout we need to, otherwise wal_sender_timeout will
+    * kill us.
+    */
+   if (!force &&
+       last_written_lsn == output_written_lsn &&
+       last_fsync_lsn != output_fsync_lsn)
+       return true;
+
+   if (verbose)
+       fprintf(stderr,
+               _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
+               progname,
+               (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+               (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+               replication_slot);
+
+   replybuf[len] = 'r';
+   len += 1;
+   fe_sendint64(output_written_lsn, &replybuf[len]);       /* write */
+   len += 8;
+   fe_sendint64(output_fsync_lsn, &replybuf[len]);     /* flush */
+   len += 8;
+   fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* apply */
+   len += 8;
+   fe_sendint64(now, &replybuf[len]);          /* sendTime */
+   len += 8;
+   replybuf[len] = replyRequested ? 1 : 0;     /* replyRequested */
+   len += 1;
+
+   startpos = output_written_lsn;
+   last_written_lsn = output_written_lsn;
+   last_fsync_lsn = output_fsync_lsn;
+
+   if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
+   {
+       fprintf(stderr, _("%s: could not send feedback packet: %s"),
+               progname, PQerrorMessage(conn));
+       return false;
+   }
+
+   return true;
+}
+
+static void
+disconnect_and_exit(int code)
+{
+   if (conn != NULL)
+       PQfinish(conn);
+
+   exit(code);
+}
+
+static bool
+OutputFsync(int64 now)
+{
+   output_last_fsync = now;
+
+   output_fsync_lsn = output_written_lsn;
+
+   if (fsync_interval <= 0)
+       return true;
+
+   if (!output_unsynced)
+       return true;
+
+   output_unsynced = false;
+
+   /* Accept EINVAL, in case output is writing to a pipe or similar. */
+   if (fsync(outfd) != 0 && errno != EINVAL)
+   {
+       fprintf(stderr,
+               _("%s: could not fsync log file \"%s\": %s\n"),
+               progname, outfile, strerror(errno));
+       return false;
+   }
+
+   return true;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+   PGresult   *res;
+   char       *copybuf = NULL;
+   int64       last_status = -1;
+   int         i;
+   PQExpBuffer query;
+
+   output_written_lsn = InvalidXLogRecPtr;
+   output_fsync_lsn = InvalidXLogRecPtr;
+
+   query = createPQExpBuffer();
+
+   /*
+    * Connect in replication mode to the server
+    */
+   if (!conn)
+       conn = GetConnection();
+   if (!conn)
+       /* Error message already written in GetConnection() */
+       return;
+
+   /*
+    * Start the replication
+    */
+   if (verbose)
+       fprintf(stderr,
+               _("%s: starting log streaming at %X/%X (slot %s)\n"),
+               progname, (uint32) (startpos >> 32), (uint32) startpos,
+               replication_slot);
+
+   /* Initiate the replication stream at specified location */
+   appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
+                     replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
+
+   /* print options if there are any */
+   if (noptions)
+       appendPQExpBufferStr(query, " (");
+
+   for (i = 0; i < noptions; i++)
+   {
+       /* separator */
+       if (i > 0)
+           appendPQExpBufferStr(query, ", ");
+
+       /* write option name */
+       appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
+
+       /* write option value if specified */
+       if (options[(i * 2) + 1] != NULL)
+           appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
+   }
+
+   if (noptions)
+       appendPQExpBufferChar(query, ')');
+
+   res = PQexec(conn, query->data);
+   if (PQresultStatus(res) != PGRES_COPY_BOTH)
+   {
+       fprintf(stderr, _("%s: could not send replication command \"%s\": %s\n"),
+               progname, query->data, PQresultErrorMessage(res));
+       PQclear(res);
+       goto error;
+   }
+   PQclear(res);
+   resetPQExpBuffer(query);
+
+   if (verbose)
+       fprintf(stderr,
+               _("%s: initiated streaming\n"),
+               progname);
+
+   while (!time_to_abort)
+   {
+       int         r;
+       int         bytes_left;
+       int         bytes_written;
+       int64       now;
+       int         hdr_len;
+
+       if (copybuf != NULL)
+       {
+           PQfreemem(copybuf);
+           copybuf = NULL;
+       }
+
+       /*
+        * Potentially send a status message to the master
+        */
+       now = feGetCurrentTimestamp();
+
+       if (outfd != -1 &&
+           feTimestampDifferenceExceeds(output_last_fsync, now,
+                                        fsync_interval))
+       {
+           if (!OutputFsync(now))
+               goto error;
+       }
+
+       if (standby_message_timeout > 0 &&
+           feTimestampDifferenceExceeds(last_status, now,
+                                        standby_message_timeout))
+       {
+           /* Time to send feedback! */
+           if (!sendFeedback(conn, now, true, false))
+               goto error;
+
+           last_status = now;
+       }
+
+       r = PQgetCopyData(conn, &copybuf, 1);
+       if (r == 0)
+       {
+           /*
+            * In async mode, and no data available. We block on reading but
+            * not more than the specified timeout, so that we can send a
+            * response back to the client.
+            */
+           fd_set      input_mask;
+           int64       message_target = 0;
+           int64       fsync_target = 0;
+           struct timeval timeout;
+           struct timeval *timeoutptr;
+
+           FD_ZERO(&input_mask);
+           FD_SET(PQsocket(conn), &input_mask);
+
+           /* Compute when we need to wakeup to send a keepalive message. */
+           if (standby_message_timeout)
+               message_target = last_status + (standby_message_timeout - 1) *
+                   ((int64) 1000);
+
+           /* Compute when we need to wakeup to fsync the output file. */
+           if (fsync_interval > 0 && output_unsynced)
+               fsync_target = output_last_fsync + (fsync_interval - 1) *
+                   ((int64) 1000);
+
+           /* Now compute when to wakeup. */
+           if (message_target > 0 || fsync_target > 0)
+           {
+               int64       targettime;
+               long        secs;
+               int         usecs;
+
+               targettime = message_target;
+
+               if (fsync_target > 0 && fsync_target < targettime)
+                   targettime = fsync_target;
+
+               feTimestampDifference(now,
+                                     targettime,
+                                     &secs,
+                                     &usecs);
+               if (secs <= 0)
+                   timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+               else
+                   timeout.tv_sec = secs;
+               timeout.tv_usec = usecs;
+               timeoutptr = &timeout;
+           }
+
+           r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+           if (r == 0 || (r < 0 && errno == EINTR))
+           {
+               /*
+                * Got a timeout or signal. Continue the loop and either
+                * deliver a status packet to the server or just go back into
+                * blocking.
+                */
+               continue;
+           }
+           else if (r < 0)
+           {
+               fprintf(stderr, _("%s: select() failed: %s\n"),
+                       progname, strerror(errno));
+               goto error;
+           }
+
+           /* Else there is actually data on the socket */
+           if (PQconsumeInput(conn) == 0)
+           {
+               fprintf(stderr,
+                       _("%s: could not receive data from WAL stream: %s"),
+                       progname, PQerrorMessage(conn));
+               goto error;
+           }
+           continue;
+       }
+
+       /* End of copy stream */
+       if (r == -1)
+           break;
+
+       /* Failure while reading the copy stream */
+       if (r == -2)
+       {
+           fprintf(stderr, _("%s: could not read COPY data: %s"),
+                   progname, PQerrorMessage(conn));
+           goto error;
+       }
+
+       /* Check the message type. */
+       if (copybuf[0] == 'k')
+       {
+           int         pos;
+           bool        replyRequested;
+           XLogRecPtr  walEnd;
+
+           /*
+            * Parse the keepalive message, enclosed in the CopyData message.
+            * We just check if the server requested a reply, and ignore the
+            * rest.
+            */
+           pos = 1;            /* skip msgtype 'k' */
+           walEnd = fe_recvint64(&copybuf[pos]);
+           output_written_lsn = Max(walEnd, output_written_lsn);
+
+           pos += 8;           /* read walEnd */
+
+           pos += 8;           /* skip sendTime */
+
+           if (r < pos + 1)
+           {
+               fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                       progname, r);
+               goto error;
+           }
+           replyRequested = copybuf[pos];
+
+           /* If the server requested an immediate reply, send one. */
+           if (replyRequested)
+           {
+               /* fsync data, so we send a recent flush pointer */
+               if (!OutputFsync(now))
+                   goto error;
+
+               now = feGetCurrentTimestamp();
+               if (!sendFeedback(conn, now, true, false))
+                   goto error;
+               last_status = now;
+           }
+           continue;
+       }
+       else if (copybuf[0] != 'w')
+       {
+           fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+                   progname, copybuf[0]);
+           goto error;
+       }
+
+
+       /*
+        * Read the header of the XLogData message, enclosed in the CopyData
+        * message. We only need the WAL location field (dataStart), the rest
+        * of the header is ignored.
+        */
+       hdr_len = 1;            /* msgtype 'w' */
+       hdr_len += 8;           /* dataStart */
+       hdr_len += 8;           /* walEnd */
+       hdr_len += 8;           /* sendTime */
+       if (r < hdr_len + 1)
+       {
+           fprintf(stderr, _("%s: streaming header too small: %d\n"),
+                   progname, r);
+           goto error;
+       }
+
+       /* Extract WAL location for this block */
+       {
+           XLogRecPtr  temp = fe_recvint64(&copybuf[1]);
+
+           output_written_lsn = Max(temp, output_written_lsn);
+       }
+
+       /* redirect output to stdout */
+       if (outfd == -1 && strcmp(outfile, "-") == 0)
+       {
+           outfd = fileno(stdout);
+       }
+
+       /* got SIGHUP, close output file */
+       if (outfd != -1 && output_reopen)
+       {
+           now = feGetCurrentTimestamp();
+           if (!OutputFsync(now))
+               goto error;
+           close(outfd);
+           outfd = -1;
+           output_reopen = false;
+       }
+
+       if (outfd == -1)
+       {
+
+           outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
+                        S_IRUSR | S_IWUSR);
+           if (outfd == -1)
+           {
+               fprintf(stderr,
+                       _("%s: could not open log file \"%s\": %s\n"),
+                       progname, outfile, strerror(errno));
+               goto error;
+           }
+       }
+
+       bytes_left = r - hdr_len;
+       bytes_written = 0;
+
+       /* signal that a fsync is needed */
+       output_unsynced = true;
+
+       while (bytes_left)
+       {
+           int         ret;
+
+           ret = write(outfd,
+                       copybuf + hdr_len + bytes_written,
+                       bytes_left);
+
+           if (ret < 0)
+           {
+               fprintf(stderr,
+                 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+                       progname, bytes_left, outfile,
+                       strerror(errno));
+               goto error;
+           }
+
+           /* Write was successful, advance our position */
+           bytes_written += ret;
+           bytes_left -= ret;
+       }
+
+       if (write(outfd, "\n", 1) != 1)
+       {
+           fprintf(stderr,
+                 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+                   progname, 1, outfile,
+                   strerror(errno));
+           goto error;
+       }
+   }
+
+   res = PQgetResult(conn);
+   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+   {
+       fprintf(stderr,
+               _("%s: unexpected termination of replication stream: %s"),
+               progname, PQresultErrorMessage(res));
+       goto error;
+   }
+   PQclear(res);
+
+   if (copybuf != NULL)
+       PQfreemem(copybuf);
+
+   if (outfd != -1 && strcmp(outfile, "-") != 0)
+   {
+       int64 t = feGetCurrentTimestamp();
+
+       /* no need to jump to error on failure here, we're finishing anyway */
+       OutputFsync(t);
+
+       if (close(outfd) != 0)
+           fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+                   progname, outfile, strerror(errno));
+   }
+   outfd = -1;
+error:
+   destroyPQExpBuffer(query);
+   PQfinish(conn);
+   conn = NULL;
+}
+
+/*
+ * Unfortunately we can't do sensible signal handling on windows...
+ */
+#ifndef WIN32
+
+/*
+ * When sigint is called, just tell the system to exit at the next possible
+ * moment.
+ */
+static void
+sigint_handler(int signum)
+{
+   time_to_abort = true;
+}
+
+/*
+ * Trigger the output file to be reopened.
+ */
+static void
+sighup_handler(int signum)
+{
+   output_reopen = true;
+}
+#endif
+
+
+int
+main(int argc, char **argv)
+{
+   PGresult   *res;
+   static struct option long_options[] = {
+/* general options */
+       {"file", required_argument, NULL, 'f'},
+       {"no-loop", no_argument, NULL, 'n'},
+       {"verbose", no_argument, NULL, 'v'},
+       {"version", no_argument, NULL, 'V'},
+       {"help", no_argument, NULL, '?'},
+/* connnection options */
+       {"dbname", required_argument, NULL, 'd'},
+       {"host", required_argument, NULL, 'h'},
+       {"port", required_argument, NULL, 'p'},
+       {"username", required_argument, NULL, 'U'},
+       {"no-password", no_argument, NULL, 'w'},
+       {"password", no_argument, NULL, 'W'},
+/* replication options */
+       {"option", required_argument, NULL, 'o'},
+       {"plugin", required_argument, NULL, 'P'},
+       {"status-interval", required_argument, NULL, 's'},
+       {"fsync-interval", required_argument, NULL, 'F'},
+       {"slot", required_argument, NULL, 'S'},
+       {"startpos", required_argument, NULL, 'I'},
+/* action */
+       {"create", no_argument, NULL, 1},
+       {"start", no_argument, NULL, 2},
+       {"drop", no_argument, NULL, 3},
+       {NULL, 0, NULL, 0}
+   };
+   int         c;
+   int         option_index;
+   uint32      hi,
+               lo;
+
+   progname = get_progname(argv[0]);
+   set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
+
+   if (argc > 1)
+   {
+       if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+       {
+           usage();
+           exit(0);
+       }
+       else if (strcmp(argv[1], "-V") == 0 ||
+                strcmp(argv[1], "--version") == 0)
+       {
+           puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
+           exit(0);
+       }
+   }
+
+   while ((c = getopt_long(argc, argv, "f:F:nvd:h:o:p:U:wWP:s:S:",
+                           long_options, &option_index)) != -1)
+   {
+       switch (c)
+       {
+/* general options */
+           case 'f':
+               outfile = pg_strdup(optarg);
+               break;
+           case 'n':
+               noloop = 1;
+               break;
+           case 'v':
+               verbose++;
+               break;
+/* connnection options */
+           case 'd':
+               dbname = pg_strdup(optarg);
+               break;
+           case 'h':
+               dbhost = pg_strdup(optarg);
+               break;
+           case 'p':
+               if (atoi(optarg) <= 0)
+               {
+                   fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+                           progname, optarg);
+                   exit(1);
+               }
+               dbport = pg_strdup(optarg);
+               break;
+           case 'U':
+               dbuser = pg_strdup(optarg);
+               break;
+           case 'w':
+               dbgetpassword = -1;
+               break;
+           case 'W':
+               dbgetpassword = 1;
+               break;
+/* replication options */
+           case 'o':
+               {
+                   char *data = pg_strdup(optarg);
+                   char *val = strchr(data, '=');
+
+                   if (val != NULL)
+                   {
+                       /* remove =; separate data from val */
+                       *val = '\0';
+                       val++;
+                   }
+
+                   noptions += 1;
+                   options = pg_realloc(options, sizeof(char*) * noptions * 2);
+
+                   options[(noptions - 1) * 2] = data;
+                   options[(noptions - 1) * 2 + 1] = val;
+               }
+
+               break;
+           case 'P':
+               plugin = pg_strdup(optarg);
+               break;
+           case 's':
+               standby_message_timeout = atoi(optarg) * 1000;
+               if (standby_message_timeout < 0)
+               {
+                   fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
+                           progname, optarg);
+                   exit(1);
+               }
+               break;
+           case 'F':
+               fsync_interval = atoi(optarg) * 1000;
+               if (fsync_interval < 0)
+               {
+                   fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+                           progname, optarg);
+                   exit(1);
+               }
+               break;
+           case 'S':
+               replication_slot = pg_strdup(optarg);
+               break;
+           case 'I':
+               if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+               {
+                   fprintf(stderr,
+                           _("%s: could not parse start position \"%s\"\n"),
+                           progname, optarg);
+                   exit(1);
+               }
+               startpos = ((uint64) hi) << 32 | lo;
+               break;
+/* action */
+           case 1:
+               do_create_slot = true;
+               break;
+           case 2:
+               do_start_slot = true;
+               break;
+           case 3:
+               do_drop_slot = true;
+               break;
+
+           default:
+
+               /*
+                * getopt_long already emitted a complaint
+                */
+               fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                       progname);
+               exit(1);
+       }
+   }
+
+   /*
+    * Any non-option arguments?
+    */
+   if (optind < argc)
+   {
+       fprintf(stderr,
+               _("%s: too many command-line arguments (first is \"%s\")\n"),
+               progname, argv[optind]);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   /*
+    * Required arguments
+    */
+   if (replication_slot == NULL)
+   {
+       fprintf(stderr, _("%s: no slot specified\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   if (do_start_slot && outfile == NULL)
+   {
+       fprintf(stderr, _("%s: no target file specified\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   if (!do_drop_slot && dbname == NULL)
+   {
+       fprintf(stderr, _("%s: no database specified\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   if (!do_drop_slot && !do_create_slot && !do_start_slot)
+   {
+       fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   if (do_drop_slot && (do_create_slot || do_start_slot))
+   {
+       fprintf(stderr, _("%s: --stop cannot be combined with --init or --start\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+   if (startpos && (do_create_slot || do_drop_slot))
+   {
+       fprintf(stderr, _("%s: --startpos cannot be combined with --init or --stop\n"), progname);
+       fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+               progname);
+       exit(1);
+   }
+
+#ifndef WIN32
+   pqsignal(SIGINT, sigint_handler);
+   pqsignal(SIGHUP, sighup_handler);
+#endif
+
+   /*
+    * don't really need this but it actually helps to get more precise error
+    * messages about authentication, required GUCs and such without starting
+    * to loop around connection attempts lateron.
+    */
+   {
+       conn = GetConnection();
+       if (!conn)
+           /* Error message already written in GetConnection() */
+           exit(1);
+
+       /*
+        * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
+        * position.
+        */
+       res = PQexec(conn, "IDENTIFY_SYSTEM");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                   progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+           disconnect_and_exit(1);
+       }
+
+       if (PQntuples(res) != 1 || PQnfields(res) < 4)
+       {
+           fprintf(stderr,
+                   _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+                   progname, PQntuples(res), PQnfields(res), 1, 4);
+           disconnect_and_exit(1);
+       }
+       PQclear(res);
+   }
+
+
+   /*
+    * stop a replication slot
+    */
+   if (do_drop_slot)
+   {
+       char        query[256];
+
+       if (verbose)
+           fprintf(stderr,
+                   _("%s: freeing replication slot \"%s\"\n"),
+                   progname, replication_slot);
+
+       snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
+                replication_slot);
+       res = PQexec(conn, query);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+           fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                   progname, query, PQerrorMessage(conn));
+           disconnect_and_exit(1);
+       }
+
+       if (PQntuples(res) != 0 || PQnfields(res) != 0)
+       {
+           fprintf(stderr,
+                   _("%s: could not stop logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                   progname, PQntuples(res), PQnfields(res), 0, 0);
+           disconnect_and_exit(1);
+       }
+
+       PQclear(res);
+       disconnect_and_exit(0);
+   }
+
+   /*
+    * init a replication slot
+    */
+   if (do_create_slot)
+   {
+       char        query[256];
+
+       if (verbose)
+           fprintf(stderr,
+                   _("%s: initializing replication slot \"%s\"\n"),
+                   progname, replication_slot);
+
+       snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+                replication_slot, plugin);
+
+       res = PQexec(conn, query);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+           fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+                   progname, query, PQerrorMessage(conn));
+           disconnect_and_exit(1);
+       }
+
+       if (PQntuples(res) != 1 || PQnfields(res) != 4)
+       {
+           fprintf(stderr,
+                   _("%s: could not init logical rep: got %d rows and %d fields, expected %d rows and %d fields\n"),
+                   progname, PQntuples(res), PQnfields(res), 1, 4);
+           disconnect_and_exit(1);
+       }
+
+       if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+       {
+           fprintf(stderr,
+                   _("%s: could not parse log location \"%s\"\n"),
+                   progname, PQgetvalue(res, 0, 1));
+           disconnect_and_exit(1);
+       }
+       startpos = ((uint64) hi) << 32 | lo;
+
+       replication_slot = strdup(PQgetvalue(res, 0, 0));
+       PQclear(res);
+   }
+
+
+   if (!do_start_slot)
+       disconnect_and_exit(0);
+
+   while (true)
+   {
+       StreamLog();
+       if (time_to_abort)
+       {
+           /*
+            * We've been Ctrl-C'ed. That's not an error, so exit without an
+            * errorcode.
+            */
+           disconnect_and_exit(0);
+       }
+       else if (noloop)
+       {
+           fprintf(stderr, _("%s: disconnected.\n"), progname);
+           exit(1);
+       }
+       else
+       {
+           fprintf(stderr,
+           /* translator: check source for value for %d */
+                   _("%s: disconnected. Waiting %d seconds to try again.\n"),
+                   progname, RECONNECT_SLEEP_TIME);
+           pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
+       }
+   }
+}
index febe3d1a2b7063917332d9008b9d27f442e55e52..55f3d7f367ec8f81f09bc34f5d5e4911797ac95c 100644 (file)
  *       src/bin/pg_basebackup/receivelog.c
  *-------------------------------------------------------------------------
  */
+
 #include "postgres_fe.h"
 
 #include <sys/stat.h>
-#include <sys/time.h>
-#include <sys/types.h>
 #include <unistd.h>
-/* for ntohl/htonl */
-#include <netinet/in.h>
-#include <arpa/inet.h>
-
-#include "libpq-fe.h"
-#include "access/xlog_internal.h"
 
+/* local includes */
 #include "receivelog.h"
 #include "streamutil.h"
 
+#include "libpq-fe.h"
+#include "access/xlog_internal.h"
+
 
 /* fd and filename for currently open WAL file */
 static int walfile = -1;
@@ -194,63 +191,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 }
 
 
-/*
- * Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
- */
-static int64
-localGetCurrentTimestamp(void)
-{
-   int64       result;
-   struct timeval tp;
-
-   gettimeofday(&tp, NULL);
-
-   result = (int64) tp.tv_sec -
-       ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
-   result = (result * USECS_PER_SEC) + tp.tv_usec;
-
-   return result;
-}
-
-/*
- * Local version of TimestampDifference(), since we are not linked with
- * backend code.
- */
-static void
-localTimestampDifference(int64 start_time, int64 stop_time,
-                        long *secs, int *microsecs)
-{
-   int64       diff = stop_time - start_time;
-
-   if (diff <= 0)
-   {
-       *secs = 0;
-       *microsecs = 0;
-   }
-   else
-   {
-       *secs = (long) (diff / USECS_PER_SEC);
-       *microsecs = (int) (diff % USECS_PER_SEC);
-   }
-}
-
-/*
- * Local version of TimestampDifferenceExceeds(), since we are not
- * linked with backend code.
- */
-static bool
-localTimestampDifferenceExceeds(int64 start_time,
-                               int64 stop_time,
-                               int msec)
-{
-   int64       diff = stop_time - start_time;
-
-   return (diff >= msec * INT64CONST(1000));
-}
-
 /*
  * Check if a timeline history file exists.
  */
@@ -370,47 +310,6 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
    return true;
 }
 
-/*
- * Converts an int64 to network byte order.
- */
-static void
-sendint64(int64 i, char *buf)
-{
-   uint32      n32;
-
-   /* High order half first, since we're doing MSB-first */
-   n32 = (uint32) (i >> 32);
-   n32 = htonl(n32);
-   memcpy(&buf[0], &n32, 4);
-
-   /* Now the low order half */
-   n32 = (uint32) i;
-   n32 = htonl(n32);
-   memcpy(&buf[4], &n32, 4);
-}
-
-/*
- * Converts an int64 from network byte order to native format.
- */
-static int64
-recvint64(char *buf)
-{
-   int64       result;
-   uint32      h32;
-   uint32      l32;
-
-   memcpy(&h32, buf, 4);
-   memcpy(&l32, buf + 4, 4);
-   h32 = ntohl(h32);
-   l32 = ntohl(l32);
-
-   result = h32;
-   result <<= 32;
-   result |= l32;
-
-   return result;
-}
-
 /*
  * Send a Standby Status Update message to server.
  */
@@ -422,16 +321,16 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
 
    replybuf[len] = 'r';
    len += 1;
-   sendint64(blockpos, &replybuf[len]);        /* write */
+   fe_sendint64(blockpos, &replybuf[len]);     /* write */
    len += 8;
    if (reportFlushPosition)
-       sendint64(lastFlushPosition, &replybuf[len]);       /* flush */
+       fe_sendint64(lastFlushPosition, &replybuf[len]);        /* flush */
    else
-       sendint64(InvalidXLogRecPtr, &replybuf[len]);       /* flush */
+       fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* flush */
    len += 8;
-   sendint64(InvalidXLogRecPtr, &replybuf[len]);       /* apply */
+   fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);        /* apply */
    len += 8;
-   sendint64(now, &replybuf[len]);     /* sendTime */
+   fe_sendint64(now, &replybuf[len]);      /* sendTime */
    len += 8;
    replybuf[len] = replyRequested ? 1 : 0;     /* replyRequested */
    len += 1;
@@ -864,9 +763,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        /*
         * Potentially send a status message to the master
         */
-       now = localGetCurrentTimestamp();
+       now = feGetCurrentTimestamp();
        if (still_sending && standby_message_timeout > 0 &&
-           localTimestampDifferenceExceeds(last_status, now,
+           feTimestampDifferenceExceeds(last_status, now,
                                            standby_message_timeout))
        {
            /* Time to send feedback! */
@@ -895,10 +794,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                int         usecs;
 
                targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-               localTimestampDifference(now,
-                                        targettime,
-                                        &secs,
-                                        &usecs);
+               feTimestampDifference(now,
+                                     targettime,
+                                     &secs,
+                                     &usecs);
                if (secs <= 0)
                    timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                else
@@ -1002,7 +901,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            /* If the server requested an immediate reply, send one. */
            if (replyRequested && still_sending)
            {
-               now = localGetCurrentTimestamp();
+               now = feGetCurrentTimestamp();
                if (!sendFeedback(conn, blockpos, now, false))
                    goto error;
                last_status = now;
@@ -1032,7 +931,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                        progname, r);
                goto error;
            }
-           blockpos = recvint64(&copybuf[1]);
+           blockpos = fe_recvint64(&copybuf[1]);
 
            /* Extract WAL location for this block */
            xlogoff = blockpos % XLOG_SEG_SIZE;
index 7c983cd604a42a78d8a75ca4843439e3fa9344e7..f4789a580ae75df4b97fe40be89ec3fe4817ba70 100644 (file)
@@ -1,3 +1,5 @@
+#include "libpq-fe.h"
+
 #include "access/xlogdefs.h"
 
 /*
index 041076ff1d73976b9baf1f7a40dd3c87298d4a67..e440dc4e244d58e85a7136a4b2edefcc9738684d 100644 (file)
  */
 
 #include "postgres_fe.h"
-#include "streamutil.h"
 
 #include <stdio.h>
 #include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+/* for ntohl/htonl */
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+/* local includes */
+#include "receivelog.h"
+#include "streamutil.h"
+
+#include "common/fe_memutils.h"
+#include "datatype/timestamp.h"
 
 const char *progname;
 char      *connection_string = NULL;
@@ -23,6 +36,7 @@ char     *dbhost = NULL;
 char      *dbuser = NULL;
 char      *dbport = NULL;
 char      *replication_slot = NULL;
+char      *dbname = NULL;
 int            dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
 static char *dbpassword = NULL;
 PGconn    *conn = NULL;
@@ -87,10 +101,10 @@ GetConnection(void)
    }
 
    keywords[i] = "dbname";
-   values[i] = "replication";
+   values[i] = dbname == NULL ? "replication" : dbname;
    i++;
    keywords[i] = "replication";
-   values[i] = "true";
+   values[i] = dbname == NULL ? "true" : "database";
    i++;
    keywords[i] = "fallback_application_name";
    values[i] = progname;
@@ -212,3 +226,102 @@ GetConnection(void)
 
    return tmpconn;
 }
+
+
+/*
+ * Frontend version of GetCurrentTimestamp(), since we are not linked with
+ * backend code. The protocol always uses integer timestamps, regardless of
+ * server setting.
+ */
+int64
+feGetCurrentTimestamp(void)
+{
+   int64       result;
+   struct timeval tp;
+
+   gettimeofday(&tp, NULL);
+
+   result = (int64) tp.tv_sec -
+       ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+   result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+   return result;
+}
+
+/*
+ * Frontend version of TimestampDifference(), since we are not linked with
+ * backend code.
+ */
+void
+feTimestampDifference(int64 start_time, int64 stop_time,
+                        long *secs, int *microsecs)
+{
+   int64       diff = stop_time - start_time;
+
+   if (diff <= 0)
+   {
+       *secs = 0;
+       *microsecs = 0;
+   }
+   else
+   {
+       *secs = (long) (diff / USECS_PER_SEC);
+       *microsecs = (int) (diff % USECS_PER_SEC);
+   }
+}
+
+/*
+ * Frontend version of TimestampDifferenceExceeds(), since we are not
+ * linked with backend code.
+ */
+bool
+feTimestampDifferenceExceeds(int64 start_time,
+                               int64 stop_time,
+                               int msec)
+{
+   int64       diff = stop_time - start_time;
+
+   return (diff >= msec * INT64CONST(1000));
+}
+
+/*
+ * Converts an int64 to network byte order.
+ */
+void
+fe_sendint64(int64 i, char *buf)
+{
+   uint32      n32;
+
+   /* High order half first, since we're doing MSB-first */
+   n32 = (uint32) (i >> 32);
+   n32 = htonl(n32);
+   memcpy(&buf[0], &n32, 4);
+
+   /* Now the low order half */
+   n32 = (uint32) i;
+   n32 = htonl(n32);
+   memcpy(&buf[4], &n32, 4);
+}
+
+/*
+ * Converts an int64 from network byte order to native format.
+ */
+int64
+fe_recvint64(char *buf)
+{
+   int64       result;
+   uint32      h32;
+   uint32      l32;
+
+   memcpy(&h32, buf, 4);
+   memcpy(&l32, buf + 4, 4);
+   h32 = ntohl(h32);
+   l32 = ntohl(l32);
+
+   result = h32;
+   result <<= 32;
+   result |= l32;
+
+   return result;
+}
index 7c7d0228897780dae2f12328501401e327050f52..d0f3799d1e34282e5e12de3ea725f5c6b203fba2 100644 (file)
@@ -5,6 +5,7 @@ extern char *connection_string;
 extern char *dbhost;
 extern char *dbuser;
 extern char *dbport;
+extern char *dbname;
 extern int dbgetpassword;
 extern char *replication_slot;
 
@@ -12,3 +13,12 @@ extern char *replication_slot;
 extern PGconn *conn;
 
 extern PGconn *GetConnection(void);
+
+extern int64 feGetCurrentTimestamp(void);
+extern void feTimestampDifference(int64 start_time, int64 stop_time,
+                                    long *secs, int *microsecs);
+
+extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
+                                           int msec);
+extern void fe_sendint64(int64 i, char *buf);
+extern int64 fe_recvint64(char *buf);