Fix crash in WAL sender when starting physical replication
authorMichael Paquier <michael@paquier.xyz>
Mon, 8 Jun 2020 01:12:24 +0000 (10:12 +0900)
committerMichael Paquier <michael@paquier.xyz>
Mon, 8 Jun 2020 01:12:24 +0000 (10:12 +0900)
Since database connections can be used with WAL senders in 9.4, it is
possible to use physical replication.  This commit fixes a crash when
starting physical replication with a WAL sender using a database
connection, caused by the refactoring done in 850196b.

There have been discussions about forbidding the use of physical
replication in a database connection, but this is left for later,
taking care only of the crash new to 13.

While on it, add a test to check for a failure when attempting logical
replication if the WAL sender does not have a database connection.  This
part is extracted from a larger patch by Kyotaro Horiguchi.

Reported-by: Vladimir Sitnikov
Author: Michael Paquier, Kyotaro Horiguchi
Reviewed-by: Kyotaro Horiguchi, Álvaro Herrera
Discussion: https://postgr.es/m/CAB=Je-GOWMj1PTPkeUhjqQp-4W3=nW-pXe2Hjax6rJFffB5_Aw@mail.gmail.com
Backpatch-through: 13

src/backend/access/transam/xlogreader.c
src/backend/replication/walsender.c
src/include/access/xlogreader.h
src/test/recovery/t/006_logical_decoding.pl

index 5995798b585a5e86bc90ab12e5170b9e9c1240f6..cb76be4f4696f9db7f86636baa81aca94d7672c0 100644 (file)
@@ -44,6 +44,8 @@ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
                                                        XLogRecPtr recptr);
 static void ResetDecoder(XLogReaderState *state);
+static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
+                                                          int segsize, const char *waldir);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -210,7 +212,7 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
 /*
  * Initialize the passed segment structs.
  */
-void
+static void
 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
                                   int segsize, const char *waldir)
 {
index 2364cbfc61b568d85fcb8c551b03324c47ed34f1..e2477c47e0a164ecbeacaafd53a2807bb2b87fe2 100644 (file)
@@ -130,13 +130,11 @@ bool              log_replication_commands = false;
 bool           wake_wal_senders = false;
 
 /*
- * Physical walsender does not use xlogreader to read WAL, but it does use a
- * fake one to keep state.  Logical walsender uses a proper xlogreader.  Both
- * keep the 'xlogreader' pointer to the right one, for the sake of common
- * routines.
+ * xlogreader used for replication.  Note that a WAL sender doing physical
+ * replication does not need xlogreader to read WAL, but it needs one to
+ * keep a state of its work.
  */
-static XLogReaderState fake_xlogreader;
-static XLogReaderState *xlogreader;
+static XLogReaderState *xlogreader = NULL;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -285,20 +283,6 @@ InitWalSender(void)
 
        /* Initialize empty timestamp buffer for lag tracking. */
        lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
-
-       /*
-        * Prepare physical walsender's fake xlogreader struct.  Logical walsender
-        * does this later.
-        */
-       if (!am_db_walsender)
-       {
-               xlogreader = &fake_xlogreader;
-               xlogreader->routine =
-                       *XL_ROUTINE(.segment_open = WalSndSegmentOpen,
-                                               .segment_close = wal_segment_close);
-               WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
-                                                  wal_segment_size, NULL);
-       }
 }
 
 /*
@@ -594,6 +578,18 @@ StartReplication(StartReplicationCmd *cmd)
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
 
+       /* create xlogreader for physical replication */
+       xlogreader =
+               XLogReaderAllocate(wal_segment_size, NULL,
+                                                  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
+                                                                         .segment_close = wal_segment_close),
+                                                  NULL);
+
+       if (!xlogreader)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OUT_OF_MEMORY),
+                                errmsg("out of memory")));
+
        /*
         * We assume here that we're logging enough information in the WAL for
         * log-shipping, since this is checked in PostmasterMain().
@@ -1643,6 +1639,8 @@ exec_replication_command(const char *cmd_string)
                                        StartReplication(cmd);
                                else
                                        StartLogicalReplication(cmd);
+
+                               Assert(xlogreader != NULL);
                                break;
                        }
 
index d930fe957dfdcf74c617315820d517fba54d3869..b0f2a6ed43a991f0a97dbb8937298892c107c3f6 100644 (file)
@@ -262,10 +262,6 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
 /* Free an XLogReader */
 extern void XLogReaderFree(XLogReaderState *state);
 
-/* Initialize supporting structures */
-extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
-                                                          int segsize, const char *waldir);
-
 /* Position the XLogReader to given record */
 extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
 #ifdef FRONTEND
index ee05535b1c20506fb42d00fec3cf5cff24195816..78229a7b92bcd02abdab5673e43cc7facce0c83f 100644 (file)
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 13;
+use Test::More tests => 14;
 use Config;
 
 # Initialize master node
@@ -36,6 +36,15 @@ ok( $stderr =~
          m/replication slot "test_slot" was not created in this database/,
        "Logical decoding correctly fails to start");
 
+# Check case of walsender not using a database connection.  Logical
+# decoding should not be allowed.
+($result, $stdout, $stderr) = $node_master->psql(
+       'template1',
+       qq[START_REPLICATION SLOT s1 LOGICAL 0/1],
+       replication => 'true');
+ok($stderr =~ /ERROR:  logical decoding requires a database connection/,
+       "Logical decoding fails on non-database connection");
+
 $node_master->safe_psql('postgres',
        qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
 );