pg_upgrade: Parallelize subscription check.
authorNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
committerNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
This commit makes use of the new task framework in pg_upgrade to
parallelize the part of check_old_cluster_subscription_state() that
verifies each of the subscribed tables is in the 'i' (initialize)
or 'r' (ready) state.  This check will now process multiple
databases concurrently when pg_upgrade's --jobs option is provided
a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13

src/bin/pg_upgrade/check.c

index 96adea41e9a36441f71f8919e7fd215e2579f3f6..f8160e01408f82fa96c558ee686839f3adbc304d 100644 (file)
@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
        check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * check_old_cluster_subscription_state()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       int                     ntup = PQntuples(res);
+       int                     i_srsubstate = PQfnumber(res, "srsubstate");
+       int                     i_subname = PQfnumber(res, "subname");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+
+       AssertVariableIsOfType(&process_old_sub_state_check, UpgradeTaskProcessCB);
+
+       for (int i = 0; i < ntup; i++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", report->path);
+
+               fprintf(report->file, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
+                               PQgetvalue(res, i, i_srsubstate),
+                               dbinfo->db_name,
+                               PQgetvalue(res, i, i_subname),
+                               PQgetvalue(res, i, i_nspname),
+                               PQgetvalue(res, i, i_relname));
+       }
+}
+
 /*
  * check_old_cluster_subscription_state()
  *
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
 static void
 check_old_cluster_subscription_state(void)
 {
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
+       UpgradeTask *task = upgrade_task_create();
+       UpgradeTaskReport report;
+       const char *query;
+       PGresult   *res;
+       PGconn     *conn;
        int                     ntup;
 
        prep_status("Checking for subscription state");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
                         log_opts.basedir,
                         "subs_invalid.txt");
-       for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               DbInfo     *active_db = &old_cluster.dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(&old_cluster, active_db->db_name);
-
-               /* We need to check for pg_replication_origin only once. */
-               if (dbnum == 0)
-               {
-                       /*
-                        * Check that all the subscriptions have their respective
-                        * replication origin.
-                        */
-                       res = executeQueryOrDie(conn,
-                                                                       "SELECT d.datname, s.subname "
-                                                                       "FROM pg_catalog.pg_subscription s "
-                                                                       "LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
-                                                                       "       ON o.roname = 'pg_' || s.oid "
-                                                                       "INNER JOIN pg_catalog.pg_database d "
-                                                                       "       ON d.oid = s.subdbid "
-                                                                       "WHERE o.roname IS NULL;");
-
-                       ntup = PQntuples(res);
-                       for (int i = 0; i < ntup; i++)
-                       {
-                               if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-                                       pg_fatal("could not open file \"%s\": %m", output_path);
-                               fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
-                                               PQgetvalue(res, i, 0),
-                                               PQgetvalue(res, i, 1));
-                       }
-                       PQclear(res);
-               }
-
-               /*
-                * We don't allow upgrade if there is a risk of dangling slot or
-                * origin corresponding to initial sync after upgrade.
-                *
-                * A slot/origin not created yet refers to the 'i' (initialize) state,
-                * while 'r' (ready) state refers to a slot/origin created previously
-                * but already dropped. These states are supported for pg_upgrade. The
-                * other states listed below are not supported:
-                *
-                * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
-                * would retain a replication slot, which could not be dropped by the
-                * sync worker spawned after the upgrade because the subscription ID
-                * used for the slot name won't match anymore.
-                *
-                * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
-                * would retain the replication origin when there is a failure in
-                * tablesync worker immediately after dropping the replication slot in
-                * the publisher.
-                *
-                * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
-                * a relation upgraded while in this state would expect an origin ID
-                * with the OID of the subscription used before the upgrade, causing
-                * it to fail.
-                *
-                * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
-                * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
-                * so we need not allow these states.
-                */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
-                                                               "FROM pg_catalog.pg_subscription_rel r "
-                                                               "LEFT JOIN pg_catalog.pg_subscription s"
-                                                               "       ON r.srsubid = s.oid "
-                                                               "LEFT JOIN pg_catalog.pg_class c"
-                                                               "       ON r.srrelid = c.oid "
-                                                               "LEFT JOIN pg_catalog.pg_namespace n"
-                                                               "       ON c.relnamespace = n.oid "
-                                                               "WHERE r.srsubstate NOT IN ('i', 'r') "
-                                                               "ORDER BY s.subname");
-
-               ntup = PQntuples(res);
-               for (int i = 0; i < ntup; i++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", output_path);
-
-                       fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
-                                       PQgetvalue(res, i, 0),
-                                       active_db->db_name,
-                                       PQgetvalue(res, i, 1),
-                                       PQgetvalue(res, i, 2),
-                                       PQgetvalue(res, i, 3));
-               }
 
-               PQclear(res);
-               PQfinish(conn);
+       /*
+        * Check that all the subscriptions have their respective replication
+        * origin.  This check only needs to run once.
+        */
+       conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+       res = executeQueryOrDie(conn,
+                                                       "SELECT d.datname, s.subname "
+                                                       "FROM pg_catalog.pg_subscription s "
+                                                       "LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
+                                                       "       ON o.roname = 'pg_' || s.oid "
+                                                       "INNER JOIN pg_catalog.pg_database d "
+                                                       "       ON d.oid = s.subdbid "
+                                                       "WHERE o.roname IS NULL;");
+       ntup = PQntuples(res);
+       for (int i = 0; i < ntup; i++)
+       {
+               if (report.file == NULL &&
+                       (report.file = fopen_priv(report.path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", report.path);
+               fprintf(report.file, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
+                               PQgetvalue(res, i, 0),
+                               PQgetvalue(res, i, 1));
        }
+       PQclear(res);
+       PQfinish(conn);
 
-       if (script)
+       /*
+        * We don't allow upgrade if there is a risk of dangling slot or origin
+        * corresponding to initial sync after upgrade.
+        *
+        * A slot/origin not created yet refers to the 'i' (initialize) state,
+        * while 'r' (ready) state refers to a slot/origin created previously but
+        * already dropped. These states are supported for pg_upgrade. The other
+        * states listed below are not supported:
+        *
+        * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+        * retain a replication slot, which could not be dropped by the sync
+        * worker spawned after the upgrade because the subscription ID used for
+        * the slot name won't match anymore.
+        *
+        * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+        * retain the replication origin when there is a failure in tablesync
+        * worker immediately after dropping the replication slot in the
+        * publisher.
+        *
+        * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+        * relation upgraded while in this state would expect an origin ID with
+        * the OID of the subscription used before the upgrade, causing it to
+        * fail.
+        *
+        * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
+        * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we
+        * need not allow these states.
+        */
+       query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+               "FROM pg_catalog.pg_subscription_rel r "
+               "LEFT JOIN pg_catalog.pg_subscription s"
+               "   ON r.srsubid = s.oid "
+               "LEFT JOIN pg_catalog.pg_class c"
+               "   ON r.srrelid = c.oid "
+               "LEFT JOIN pg_catalog.pg_namespace n"
+               "   ON c.relnamespace = n.oid "
+               "WHERE r.srsubstate NOT IN ('i', 'r') "
+               "ORDER BY s.subname";
+
+       upgrade_task_add_step(task, query, process_old_sub_state_check,
+                                                 true, &report);
+
+       upgrade_task_run(task, &old_cluster);
+       upgrade_task_free(task);
+
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
                                 "You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
                                 "A list of the problematic subscriptions is in the file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();