pg_upgrade: Parallelize retrieving relation information.
authorNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
committerNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
This commit makes use of the new task framework in pg_upgrade to
parallelize retrieving relation and logical slot information.  This
step will now process multiple databases concurrently when
pg_upgrade's --jobs option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13

src/bin/pg_upgrade/info.c

index d3c1e8918d7560cf82e68eb4e31e696e860ab8f0..5883acc79c0045b76a53893d92c5f3f5e0711f66 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "pqexpbuffer.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(void);
+static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(void);
+static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg);
 
 
 /*
@@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster)
 {
-       int                     dbnum;
+       UpgradeTask *task = upgrade_task_create();
+       char       *rel_infos_query = NULL;
+       char       *logical_slot_infos_query = NULL;
 
        if (cluster->dbarr.dbs != NULL)
                free_db_and_rel_infos(&cluster->dbarr);
@@ -284,15 +289,37 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
        get_template0_info(cluster);
        get_db_infos(cluster);
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       rel_infos_query = get_rel_infos_query();
+       upgrade_task_add_step(task,
+                                                 rel_infos_query,
+                                                 process_rel_infos,
+                                                 true, NULL);
+
+       /*
+        * Logical slots are only carried over to the new cluster when the old
+        * cluster is on PG17 or newer.  This is because before that the logical
+        * slots are not saved at shutdown, so there is no guarantee that the
+        * latest confirmed_flush_lsn is saved to disk which can lead to data
+        * loss. It is still not guaranteed for manually created slots in PG17, so
+        * subsequent checks done in check_old_cluster_for_valid_slots() would
+        * raise a FATAL error if such slots are included.
+        */
+       if (cluster == &old_cluster &&
+               GET_MAJOR_VERSION(cluster->major_version) > 1600)
        {
-               DbInfo     *pDbInfo = &cluster->dbarr.dbs[dbnum];
+               logical_slot_infos_query = get_old_cluster_logical_slot_infos_query();
+               upgrade_task_add_step(task,
+                                                         logical_slot_infos_query,
+                                                         process_old_cluster_logical_slot_infos,
+                                                         true, NULL);
+       }
 
-               get_rel_infos(cluster, pDbInfo);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-               if (cluster == &old_cluster)
-                       get_old_cluster_logical_slot_infos(pDbInfo);
-       }
+       pg_free(rel_infos_query);
+       if (logical_slot_infos_query)
+               pg_free(logical_slot_infos_query);
 
        if (cluster == &old_cluster)
                pg_log(PG_VERBOSE, "\nsource databases:");
@@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster)
 
 
 /*
- * get_rel_infos()
+ * get_rel_infos_query()
  *
- * gets the relinfos for all the user tables and indexes of the database
- * referred to by "dbinfo".
+ * Returns the query for retrieving the relation information for all the user
+ * tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s
+ * UpgradeTask.
  *
- * Note: the resulting RelInfo array is assumed to be sorted by OID.
- * This allows later processing to match up old and new databases efficiently.
+ * Note: the result is assumed to be sorted by OID.  This allows later
+ * processing to match up old and new databases efficiently.
  */
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(void)
 {
-       PGconn     *conn = connectToServer(cluster,
-                                                                          dbinfo->db_name);
-       PGresult   *res;
-       RelInfo    *relinfos;
-       int                     ntups;
-       int                     relnum;
-       int                     num_rels = 0;
-       char       *nspname = NULL;
-       char       *relname = NULL;
-       char       *tablespace = NULL;
-       int                     i_spclocation,
-                               i_nspname,
-                               i_relname,
-                               i_reloid,
-                               i_indtable,
-                               i_toastheap,
-                               i_relfilenumber,
-                               i_reltablespace;
-       char            query[QUERY_ALLOC];
-       char       *last_namespace = NULL,
-                          *last_tablespace = NULL;
+       PQExpBufferData query;
 
-       query[0] = '\0';                        /* initialize query string to empty */
+       initPQExpBuffer(&query);
 
        /*
         * Create a CTE that collects OIDs of regular user tables and matviews,
@@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * output, so we have to copy that system table.  It's easiest to do that
         * by treating it as a user table.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "WITH regular_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.oid, 0::oid, 0::oid "
-                        "  FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n "
-                        "         ON c.relnamespace = n.oid "
-                        "  WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", "
-                        CppAsString2(RELKIND_MATVIEW) ") AND "
+       appendPQExpBuffer(&query,
+                                         "WITH regular_heap (reloid, indtable, toastheap) AS ( "
+                                         "  SELECT c.oid, 0::oid, 0::oid "
+                                         "  FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n "
+                                         "         ON c.relnamespace = n.oid "
+                                         "  WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", "
+                                         CppAsString2(RELKIND_MATVIEW) ") AND "
        /* exclude possible orphaned temp tables */
-                        "    ((n.nspname !~ '^pg_temp_' AND "
-                        "      n.nspname !~ '^pg_toast_temp_' AND "
-                        "      n.nspname NOT IN ('pg_catalog', 'information_schema', "
-                        "                        'binary_upgrade', 'pg_toast') AND "
-                        "      c.oid >= %u::pg_catalog.oid) OR "
-                        "     (n.nspname = 'pg_catalog' AND "
-                        "      relname IN ('pg_largeobject') ))), ",
-                        FirstNormalObjectId);
+                                         "    ((n.nspname !~ '^pg_temp_' AND "
+                                         "      n.nspname !~ '^pg_toast_temp_' AND "
+                                         "      n.nspname NOT IN ('pg_catalog', 'information_schema', "
+                                         "                        'binary_upgrade', 'pg_toast') AND "
+                                         "      c.oid >= %u::pg_catalog.oid) OR "
+                                         "     (n.nspname = 'pg_catalog' AND "
+                                         "      relname IN ('pg_largeobject') ))), ",
+                                         FirstNormalObjectId);
 
        /*
         * Add a CTE that collects OIDs of toast tables belonging to the tables
         * selected by the regular_heap CTE.  (We have to do this separately
         * because the namespace-name rules above don't work for toast tables.)
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  toast_heap (reloid, indtable, toastheap) AS ( "
-                        "  SELECT c.reltoastrelid, 0::oid, c.oid "
-                        "  FROM regular_heap JOIN pg_catalog.pg_class c "
-                        "      ON regular_heap.reloid = c.oid "
-                        "  WHERE c.reltoastrelid != 0), ");
+       appendPQExpBufferStr(&query,
+                                                "  toast_heap (reloid, indtable, toastheap) AS ( "
+                                                "  SELECT c.reltoastrelid, 0::oid, c.oid "
+                                                "  FROM regular_heap JOIN pg_catalog.pg_class c "
+                                                "      ON regular_heap.reloid = c.oid "
+                                                "  WHERE c.reltoastrelid != 0), ");
 
        /*
         * Add a CTE that collects OIDs of all valid indexes on the previously
@@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
         * Testing indisready is necessary in 9.2, and harmless in earlier/later
         * versions.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "  all_index (reloid, indtable, toastheap) AS ( "
-                        "  SELECT indexrelid, indrelid, 0::oid "
-                        "  FROM pg_catalog.pg_index "
-                        "  WHERE indisvalid AND indisready "
-                        "    AND indrelid IN "
-                        "        (SELECT reloid FROM regular_heap "
-                        "         UNION ALL "
-                        "         SELECT reloid FROM toast_heap)) ");
+       appendPQExpBufferStr(&query,
+                                                "  all_index (reloid, indtable, toastheap) AS ( "
+                                                "  SELECT indexrelid, indrelid, 0::oid "
+                                                "  FROM pg_catalog.pg_index "
+                                                "  WHERE indisvalid AND indisready "
+                                                "    AND indrelid IN "
+                                                "        (SELECT reloid FROM regular_heap "
+                                                "         UNION ALL "
+                                                "         SELECT reloid FROM toast_heap)) ");
 
        /*
         * And now we can write the query that retrieves the data we want for each
         * heap and index relation.  Make sure result is sorted by OID.
         */
-       snprintf(query + strlen(query), sizeof(query) - strlen(query),
-                        "SELECT all_rels.*, n.nspname, c.relname, "
-                        "  c.relfilenode, c.reltablespace, "
-                        "  pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
-                        "FROM (SELECT * FROM regular_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM toast_heap "
-                        "      UNION ALL "
-                        "      SELECT * FROM all_index) all_rels "
-                        "  JOIN pg_catalog.pg_class c "
-                        "      ON all_rels.reloid = c.oid "
-                        "  JOIN pg_catalog.pg_namespace n "
-                        "     ON c.relnamespace = n.oid "
-                        "  LEFT OUTER JOIN pg_catalog.pg_tablespace t "
-                        "     ON c.reltablespace = t.oid "
-                        "ORDER BY 1;");
-
-       res = executeQueryOrDie(conn, "%s", query);
-
-       ntups = PQntuples(res);
+       appendPQExpBufferStr(&query,
+                                                "SELECT all_rels.*, n.nspname, c.relname, "
+                                                "  c.relfilenode, c.reltablespace, "
+                                                "  pg_catalog.pg_tablespace_location(t.oid) AS spclocation "
+                                                "FROM (SELECT * FROM regular_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM toast_heap "
+                                                "      UNION ALL "
+                                                "      SELECT * FROM all_index) all_rels "
+                                                "  JOIN pg_catalog.pg_class c "
+                                                "      ON all_rels.reloid = c.oid "
+                                                "  JOIN pg_catalog.pg_namespace n "
+                                                "     ON c.relnamespace = n.oid "
+                                                "  LEFT OUTER JOIN pg_catalog.pg_tablespace t "
+                                                "     ON c.reltablespace = t.oid "
+                                                "ORDER BY 1");
+
+       return query.data;
+}
 
-       relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+/*
+ * Callback function for processing results of the query returned by
+ * get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s
+ * UpgradeTask.  This function stores the relation information for later use.
+ */
+static void
+process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       int                     ntups = PQntuples(res);
+       RelInfo    *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+       int                     i_reloid = PQfnumber(res, "reloid");
+       int                     i_indtable = PQfnumber(res, "indtable");
+       int                     i_toastheap = PQfnumber(res, "toastheap");
+       int                     i_nspname = PQfnumber(res, "nspname");
+       int                     i_relname = PQfnumber(res, "relname");
+       int                     i_relfilenumber = PQfnumber(res, "relfilenode");
+       int                     i_reltablespace = PQfnumber(res, "reltablespace");
+       int                     i_spclocation = PQfnumber(res, "spclocation");
+       int                     num_rels = 0;
+       char       *nspname = NULL;
+       char       *relname = NULL;
+       char       *tablespace = NULL;
+       char       *last_namespace = NULL;
+       char       *last_tablespace = NULL;
 
-       i_reloid = PQfnumber(res, "reloid");
-       i_indtable = PQfnumber(res, "indtable");
-       i_toastheap = PQfnumber(res, "toastheap");
-       i_nspname = PQfnumber(res, "nspname");
-       i_relname = PQfnumber(res, "relname");
-       i_relfilenumber = PQfnumber(res, "relfilenode");
-       i_reltablespace = PQfnumber(res, "reltablespace");
-       i_spclocation = PQfnumber(res, "spclocation");
+       AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB);
 
-       for (relnum = 0; relnum < ntups; relnum++)
+       for (int relnum = 0; relnum < ntups; relnum++)
        {
                RelInfo    *curr = &relinfos[num_rels++];
 
@@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
                        /* A zero reltablespace oid indicates the database tablespace. */
                        curr->tablespace = dbinfo->db_tablespace;
        }
-       PQclear(res);
-
-       PQfinish(conn);
 
        dbinfo->rel_arr.rels = relinfos;
        dbinfo->rel_arr.nrels = num_rels;
 }
 
 /*
- * get_old_cluster_logical_slot_infos()
- *
- * Gets the LogicalSlotInfos for all the logical replication slots of the
- * database referred to by "dbinfo". The status of each logical slot is gotten
- * here, but they are used at the checking phase. See
- * check_old_cluster_for_valid_slots().
+ * get_old_cluster_logical_slot_infos_query()
  *
- * Note: This function will not do anything if the old cluster is pre-PG17.
- * This is because before that the logical slots are not saved at shutdown, so
- * there is no guarantee that the latest confirmed_flush_lsn is saved to disk
- * which can lead to data loss. It is still not guaranteed for manually created
- * slots in PG17, so subsequent checks done in
- * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
- * are included.
+ * Returns the query for retrieving the logical slot information for all the
+ * logical replication slots in the database, for use by
+ * get_db_rel_and_slot_infos()'s UpgradeTask.  The status of each logical slot
+ * is checked in check_old_cluster_for_valid_slots().
  */
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(void)
 {
-       PGconn     *conn;
-       PGresult   *res;
-       LogicalSlotInfo *slotinfos = NULL;
-       int                     num_slots;
-
-       /* Logical slots can be migrated since PG17. */
-       if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
-               return;
-
-       conn = connectToServer(&old_cluster, dbinfo->db_name);
-
        /*
         * Fetch the logical replication slot information. The check whether the
         * slot is considered caught up is done by an upgrade function. This
@@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
         * started and stopped several times causing any temporary slots to be
         * removed.
         */
-       res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-                                                       "%s as caught_up, invalidation_reason IS NOT NULL as invalid "
-                                                       "FROM pg_catalog.pg_replication_slots "
-                                                       "WHERE slot_type = 'logical' AND "
-                                                       "database = current_database() AND "
-                                                       "temporary IS FALSE;",
-                                                       user_opts.live_check ? "FALSE" :
-                                                       "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
-                                                       "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-                                                       "END)");
-
-       num_slots = PQntuples(res);
+       return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+                                       "%s as caught_up, invalidation_reason IS NOT NULL as invalid "
+                                       "FROM pg_catalog.pg_replication_slots "
+                                       "WHERE slot_type = 'logical' AND "
+                                       "database = current_database() AND "
+                                       "temporary IS FALSE;",
+                                       user_opts.live_check ? "FALSE" :
+                                       "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
+                                       "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+                                       "END)");
+}
+
+/*
+ * Callback function for processing results of the query returned by
+ * get_old_cluster_logical_slot_infos_query(), which is used for
+ * get_db_rel_and_slot_infos()'s UpgradeTask.  This function stores the logical
+ * slot information for later use.
+ */
+static void
+process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       LogicalSlotInfo *slotinfos = NULL;
+       int                     num_slots = PQntuples(res);
+
+       AssertVariableIsOfType(&process_old_cluster_logical_slot_infos,
+                                                  UpgradeTaskProcessCB);
 
        if (num_slots)
        {
@@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
                }
        }
 
-       PQclear(res);
-       PQfinish(conn);
-
        dbinfo->slot_arr.slots = slotinfos;
        dbinfo->slot_arr.nslots = num_slots;
 }