In successful pg_recvlogical, end PGRES_COPY_OUT cleanly.
authorNoah Misch <noah@leadboat.com>
Thu, 14 May 2020 03:42:09 +0000 (20:42 -0700)
committerNoah Misch <noah@leadboat.com>
Thu, 14 May 2020 03:42:09 +0000 (20:42 -0700)
pg_recvlogical merely called PQfinish(), so the backend sent messages
after the disconnect.  When that caused EPIPE in internal_flush(),
before a LogicalConfirmReceivedLocation(), the next pg_recvlogical would
repeat already-acknowledged records.  Whether or not the defect causes
EPIPE, post-disconnect messages could contain an ErrorResponse that the
user should see.  One properly ends PGRES_COPY_OUT by repeating
PQgetCopyData() until it returns a negative value.  Augment one of the
tests to cover the case of WAL past --endpos.  Back-patch to v10, where
commit 7c030783a5bd07cadffc2a1018bc33119a4c7505 first appeared.  Before
that commit, pg_recvlogical never reached PGRES_COPY_OUT.

Reported by Thomas Munro.

Discussion: https://postgr.es/m/CAEepm=1MzM2Z_xNe4foGwZ1a+MO_2S9oYDq3M5D11=JDU_+0Nw@mail.gmail.com

src/bin/pg_basebackup/pg_recvlogical.c
src/test/recovery/t/006_logical_decoding.pl

index 0da32771184bec27bdef6033f038135e845cd715..1304b277bc3aa8a16358bc2e01653325c10fd57a 100644 (file)
@@ -580,14 +580,40 @@ StreamLogicalLog(void)
        res = PQgetResult(conn);
        if (PQresultStatus(res) == PGRES_COPY_OUT)
        {
+               PQclear(res);
+
                /*
                 * We're doing a client-initiated clean exit and have sent CopyDone to
-                * the server. We've already sent replay confirmation and fsync'd so
-                * we can just clean up the connection now.
+                * the server. Drain any messages, so we don't miss a last-minute
+                * ErrorResponse. The walsender stops generating XLogData records once
+                * it sees CopyDone, so expect this to finish quickly. After CopyDone,
+                * it's too late for sendFeedback(), even if this were to take a long
+                * time. Hence, use synchronous-mode PQgetCopyData().
                 */
-               goto error;
+               while (1)
+               {
+                       int                     r;
+
+                       if (copybuf != NULL)
+                       {
+                               PQfreemem(copybuf);
+                               copybuf = NULL;
+                       }
+                       r = PQgetCopyData(conn, &copybuf, 0);
+                       if (r == -1)
+                               break;
+                       if (r == -2)
+                       {
+                               pg_log_error("could not read COPY data: %s",
+                                                        PQerrorMessage(conn));
+                               time_to_abort = false;  /* unclean exit */
+                               goto error;
+                       }
+               }
+
+               res = PQgetResult(conn);
        }
-       else if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                pg_log_error("unexpected termination of replication stream: %s",
                                         PQresultErrorMessage(res));
index 0bcd1d0b59f2b3145829e7a555ae3caa5515385e..1334bf6a6018db5355ef3ed6fedf8e83779d9cab 100644 (file)
@@ -71,6 +71,11 @@ my $endpos = $node_master->safe_psql('postgres',
 );
 print "waiting to replay $endpos\n";
 
+# Insert some rows after $endpos, which we won't read.
+$node_master->safe_psql('postgres',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
 my $stdout_recv = $node_master->pg_recvlogical_upto(
        'postgres', 'test_slot', $endpos, 180,
        'include-xids'     => '0',
@@ -89,7 +94,7 @@ $stdout_recv = $node_master->pg_recvlogical_upto(
        'skip-empty-xacts' => '1');
 chomp($stdout_recv);
 is($stdout_recv, '',
-       'pg_recvlogical acknowledged changes, nothing pending on slot');
+       'pg_recvlogical acknowledged changes');
 
 $node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');