Drop pre-existing subscriptions from the converted subscriber.
authorAmit Kapila <akapila@postgresql.org>
Tue, 2 Jul 2024 06:06:21 +0000 (11:36 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 2 Jul 2024 06:06:21 +0000 (11:36 +0530)
We don't need the pre-existing subscriptions on the newly formed
subscriber by using pg_createsubscriber. The apply workers corresponding
to these subscriptions can connect to other publisher nodes and either get
some unwarranted data or can lead to ERRORs in connecting to such nodes.

Author: Kuroda Hayato
Reviewed-by: Amit Kapila, Shlok Kyal, Vignesh C
Backpatch-through: 17
Discussion: https://postgr.es/m/OSBPR01MB25526A30A1FBF863ACCDDA3AF5C92@OSBPR01MB2552.jpnprd01.prod.outlook.com

src/bin/pg_basebackup/pg_createsubscriber.c
src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

index fb57737f7cd1fcca9b8f1f779f847d086f6064ca..21dd50f8089e06bca8cef146e2d04825a44eb038 100644 (file)
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
                                  const char *slot_name);
 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
 static void start_standby_server(const struct CreateSubscriberOptions *opt,
-                                bool restricted_access);
+                                bool restricted_access,
+                                bool restrict_logical_worker);
 static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
                                  const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
                                     const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+                                                 const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname,
+                                       const char *dbname);
 
 #define    USEC_PER_SEC    1000000
 #define    WAIT_INTERVAL   1       /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
        exit(1);
 }
 
+/*
+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
+ * the primary (publisher node) and the newly created subscriber. We
+ * shouldn't drop the associated slot as that would be used by the publisher
+ * node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
+{
+   PQExpBuffer query = createPQExpBuffer();
+   PGresult   *res;
+
+   Assert(conn != NULL);
+
+   /*
+    * Construct a query string. These commands are allowed to be executed
+    * within a transaction.
+    */
+   appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+                     subname);
+   appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+                     subname);
+   appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+   pg_log_info("dropping subscription \"%s\" on database \"%s\"",
+               subname, dbname);
+
+   if (!dry_run)
+   {
+       res = PQexec(conn, query->data);
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+           pg_log_error("could not drop a subscription \"%s\" settings: %s",
+                        subname, PQresultErrorMessage(res));
+           disconnect_database(conn, true);
+       }
+
+       PQclear(res);
+   }
+
+   destroyPQExpBuffer(query);
+}
+
+/*
+ * Retrieve and drop the pre-existing subscriptions.
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+                                     const struct LogicalRepInfo *dbinfo)
+{
+   PQExpBuffer query = createPQExpBuffer();
+   char       *dbname;
+   PGresult   *res;
+
+   Assert(conn != NULL);
+
+   dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
+
+   appendPQExpBuffer(query,
+                     "SELECT s.subname FROM pg_catalog.pg_subscription s "
+                     "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+                     "WHERE d.datname = %s",
+                     dbname);
+   res = PQexec(conn, query->data);
+
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       pg_log_error("could not obtain pre-existing subscriptions: %s",
+                    PQresultErrorMessage(res));
+       disconnect_database(conn, true);
+   }
+
+   for (int i = 0; i < PQntuples(res); i++)
+       drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
+                                   dbinfo->dbname);
+
+   PQclear(res);
+   destroyPQExpBuffer(query);
+}
+
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
        /* Connect to subscriber. */
        conn = connect_database(dbinfo[i].subconninfo, true);
 
+       /*
+        * We don't need the pre-existing subscriptions on the newly formed
+        * subscriber. They can connect to other publisher nodes and either
+        * get some unwarranted data or can lead to ERRORs in connecting to
+        * such nodes.
+        */
+       check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
        /*
         * Since the publication was created before the consistent LSN, it is
         * available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
 }
 
 static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+                    bool restrict_logical_worker)
 {
    PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    int         rc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
    if (opt->config_file != NULL)
        appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
                          opt->config_file);
+
+   /* Suppress to start logical replication if requested */
+   if (restrict_logical_worker)
+       appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
    pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    rc = system(pg_ctl_cmd->data);
    pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
     * transformation steps.
     */
    pg_log_info("starting the standby with command-line options");
-   start_standby_server(&opt, true);
+   start_standby_server(&opt, true, false);
 
    /* Check if the standby server is ready for logical replication */
    check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
 
    /*
     * Start subscriber so the recovery parameters will take effect. Wait
-    * until accepting connections.
+    * until accepting connections. We don't want to start logical replication
+    * during setup.
     */
    pg_log_info("starting the subscriber");
-   start_standby_server(&opt, true);
+   start_standby_server(&opt, true, true);
 
    /* Waiting the subscriber to be promoted */
    wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
index 1241bf6c6a7126e7f115f9c392f884eef7199ced..80002c5a17f434fd55b049977e64502ca3d8d266 100644 (file)
@@ -298,6 +298,13 @@ my $result = $node_s->safe_psql('postgres',
    "SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
 );
 is($result, 'failover_slot', 'failover slot is synced');
+
+# Create subscription to test its removal
+my $dummy_sub = 'regress_sub_dummy';
+$node_p->safe_psql($db1,
+   "CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
+);
+$node_p->wait_for_replay_catchup($node_s);
 $node_s->stop;
 
 # dry run mode on node S
@@ -372,6 +379,13 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm the pre-existing subscription has been removed
+$result = $node_s->safe_psql(
+   'postgres', qq(
+   SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
+));
+is($result, qq(0), 'pre-existing subscription was dropped');
+
 # Get subscription names
 $result = $node_s->safe_psql(
    'postgres', qq(