From 7c030783a5bd07cadffc2a1018bc33119a4c7505 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 4 Jan 2017 19:02:07 +0000 Subject: [PATCH] =?utf8?q?Add=20pg=5Frecvlogical=20=E2=80=94-endpos=3DLSN?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Allow pg_recvlogical to specify an ending LSN, complementing the existing -—startpos=LSN option. Craig Ringer, reviewed by Euler Taveira and Naoki Okano --- doc/src/sgml/ref/pg_recvlogical.sgml | 34 ++++++ src/bin/pg_basebackup/pg_recvlogical.c | 145 ++++++++++++++++++++++--- 2 files changed, 164 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index b35881f2b9..d066ce8701 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -38,6 +38,14 @@ PostgreSQL documentation constraints as , plus those for logical replication (see ). + + + pg_recvlogical has no equivalent to the logical decoding + SQL interface's peek and get modes. It sends replay confirmations for + data lazily as it receives it and on clean exit. To examine pending data on + a slot without consuming it, use + pg_logical_slot_peek_changes. + @@ -154,6 +162,32 @@ PostgreSQL documentation + + + + + + In mode, automatically stop replication + and exit with normal exit status 0 when receiving reaches the + specified LSN. If specified when not in + mode, an error is raised. + + + + If there's a record with LSN exactly equal to lsn, + the record will be output. + + + + The option is not aware of transaction + boundaries and may truncate output partway through a transaction. + Any partially output transaction will not be consumed and will be + replayed again when the slot is next read from. Individual messages + are never truncated. + + + + diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 49ed2abe55..658e2ba91f 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -40,6 +40,7 @@ static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */ static XLogRecPtr startpos = InvalidXLogRecPtr; +static XLogRecPtr endpos = InvalidXLogRecPtr; static bool do_create_slot = false; static bool slot_exists_ok = false; static bool do_start_slot = false; @@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; static void usage(void); static void StreamLogicalLog(void); static void disconnect_and_exit(int code); +static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); +static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, + bool keepalive, XLogRecPtr lsn); static void usage(void) @@ -81,6 +85,7 @@ usage(void) " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n")); + printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -o, --option=NAME[=VALUE]\n" " pass option NAME with optional value VALUE to the\n" @@ -281,6 +286,7 @@ StreamLogicalLog(void) int bytes_written; int64 now; int hdr_len; + XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) { @@ -454,6 +460,7 @@ StreamLogicalLog(void) int pos; bool replyRequested; XLogRecPtr walEnd; + bool endposReached = false; /* * Parse the keepalive message, enclosed in the CopyData message. @@ -476,18 +483,32 @@ StreamLogicalLog(void) } replyRequested = copybuf[pos]; - /* If the server requested an immediate reply, send one. */ - if (replyRequested) + if (endpos != InvalidXLogRecPtr && walEnd >= endpos) { - /* fsync data, so we send a recent flush pointer */ - if (!OutputFsync(now)) - goto error; + /* + * If there's nothing to read on the socket until a keepalive + * we know that the server has nothing to send us; and if + * walEnd has passed endpos, we know nothing else can have + * committed before endpos. So we can bail out now. + */ + endposReached = true; + } - now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, now, true, false)) + /* Send a reply, if necessary */ + if (replyRequested || endposReached) + { + if (!flushAndSendFeedback(conn, &now)) goto error; last_status = now; } + + if (endposReached) + { + prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + time_to_abort = true; + break; + } + continue; } else if (copybuf[0] != 'w') @@ -497,7 +518,6 @@ StreamLogicalLog(void) goto error; } - /* * Read the header of the XLogData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest @@ -515,12 +535,23 @@ StreamLogicalLog(void) } /* Extract WAL location for this block */ - { - XLogRecPtr temp = fe_recvint64(©buf[1]); + cur_record_lsn = fe_recvint64(©buf[1]); - output_written_lsn = Max(temp, output_written_lsn); + if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) + { + /* + * We've read past our endpoint, so prepare to go away being + * cautious about what happens to our output data. + */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; } + output_written_lsn = Max(cur_record_lsn, output_written_lsn); + bytes_left = r - hdr_len; bytes_written = 0; @@ -557,10 +588,29 @@ StreamLogicalLog(void) strerror(errno)); goto error; } + + if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos) + { + /* endpos was exactly the record we just processed, we're done */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; + } } res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) == PGRES_COPY_OUT) + { + /* + * We're doing a client-initiated clean exit and have sent CopyDone to + * the server. We've already sent replay confirmation and fsync'd so + * we can just clean up the connection now. + */ + goto error; + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, _("%s: unexpected termination of replication stream: %s"), @@ -638,6 +688,7 @@ main(int argc, char **argv) {"password", no_argument, NULL, 'W'}, /* replication options */ {"startpos", required_argument, NULL, 'I'}, + {"endpos", required_argument, NULL, 'E'}, {"option", required_argument, NULL, 'o'}, {"plugin", required_argument, NULL, 'P'}, {"status-interval", required_argument, NULL, 's'}, @@ -673,7 +724,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:", + while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:", long_options, &option_index)) != -1) { switch (c) @@ -733,6 +784,16 @@ main(int argc, char **argv) } startpos = ((uint64) hi) << 32 | lo; break; + case 'E': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse end position \"%s\"\n"), + progname, optarg); + exit(1); + } + endpos = ((uint64) hi) << 32 | lo; + break; case 'o': { char *data = pg_strdup(optarg); @@ -857,6 +918,16 @@ main(int argc, char **argv) exit(1); } + if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + fprintf(stderr, + _("%s: --endpos may only be specified with --start\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef WIN32 pqsignal(SIGINT, sigint_handler); pqsignal(SIGHUP, sighup_handler); @@ -923,8 +994,8 @@ main(int argc, char **argv) if (time_to_abort) { /* - * We've been Ctrl-C'ed. That's not an error, so exit without an - * errorcode. + * We've been Ctrl-C'ed or reached an exit limit condition. That's + * not an error, so exit without an errorcode. */ disconnect_and_exit(0); } @@ -943,3 +1014,47 @@ main(int argc, char **argv) } } } + +/* + * Fsync our output data, and send a feedback message to the server. Returns + * true if successful, false otherwise. + * + * If successful, *now is updated to the current timestamp just before sending + * feedback. + */ +static bool +flushAndSendFeedback(PGconn *conn, TimestampTz *now) +{ + /* flush data to disk, so that we send a recent flush pointer */ + if (!OutputFsync(*now)) + return false; + *now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, *now, true, false)) + return false; + + return true; +} + +/* + * Try to inform the server about of upcoming demise, but don't wait around or + * retry on failure. + */ +static void +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +{ + (void) PQputCopyEnd(conn, NULL); + (void) PQflush(conn); + + if (verbose) + { + if (keepalive) + fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n", + progname, + (uint32) (endpos >> 32), (uint32) endpos); + else + fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n", + progname, (uint32) (endpos >> 32), (uint32) (endpos), + (uint32) (lsn >> 32), (uint32) lsn); + + } +} -- 2.39.5