pg_upgrade: Parallelize incompatible polymorphics check.
authorNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
committerNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
This commit makes use of the new task framework in pg_upgrade to
parallelize the check for usage of incompatible polymorphic
functions, i.e., those with arguments of type anyarray/anyelement
rather than the newer anycompatible variants.  This step will now
process multiple databases concurrently when pg_upgrade's --jobs
option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13

src/bin/pg_upgrade/check.c

index 28c4ddbca371613201696564020437e2e5c716cb..c5fa1a5bedf6b965b350a0cd81b13fd94844f215 100644 (file)
@@ -1413,6 +1413,40 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
                check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * check_for_incompatible_polymorphics()'s UpgradeTask.  If the query returned
+ * any rows (i.e., the check failed), write the details to the report file.
+ */
+static void
+process_incompat_polymorphics(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_objkind = PQfnumber(res, "objkind");
+       int                     i_objname = PQfnumber(res, "objname");
+
+       AssertVariableIsOfType(&process_incompat_polymorphics,
+                                                  UpgradeTaskProcessCB);
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", report->path);
+               if (!db_used)
+               {
+                       fprintf(report->file, "In database: %s\n", dbinfo->db_name);
+                       db_used = true;
+               }
+
+               fprintf(report->file, "  %s: %s\n",
+                               PQgetvalue(res, rowno, i_objkind),
+                               PQgetvalue(res, rowno, i_objname));
+       }
+}
+
 /*
  *     check_for_incompatible_polymorphics()
  *
@@ -1422,14 +1456,15 @@ check_for_user_defined_postfix_ops(ClusterInfo *cluster)
 static void
 check_for_incompatible_polymorphics(ClusterInfo *cluster)
 {
-       PGresult   *res;
-       FILE       *script = NULL;
-       char            output_path[MAXPGPATH];
        PQExpBufferData old_polymorphics;
+       UpgradeTask *task = upgrade_task_create();
+       UpgradeTaskReport report;
+       char       *query;
 
        prep_status("Checking for incompatible polymorphic functions");
 
-       snprintf(output_path, sizeof(output_path), "%s/%s",
+       report.file = NULL;
+       snprintf(report.path, sizeof(report.path), "%s/%s",
                         log_opts.basedir,
                         "incompatible_polymorphics.txt");
 
@@ -1453,80 +1488,51 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
                                                         ", 'array_positions(anyarray,anyelement)'"
                                                         ", 'width_bucket(anyelement,anyarray)'");
 
-       for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               bool            db_used = false;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-               int                     ntups;
-               int                     i_objkind,
-                                       i_objname;
-
-               /*
-                * The query below hardcodes FirstNormalObjectId as 16384 rather than
-                * interpolating that C #define into the query because, if that
-                * #define is ever changed, the cutoff we want to use is the value
-                * used by pre-version 14 servers, not that of some future version.
-                */
-               res = executeQueryOrDie(conn,
-               /* Aggregate transition functions */
-                                                               "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc AS p "
-                                                               "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
-                                                               "WHERE p.oid >= 16384 "
-                                                               "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Aggregate final functions */
-                                                               "UNION ALL "
-                                                               "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
-                                                               "FROM pg_proc AS p "
-                                                               "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
-                                                               "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
-                                                               "WHERE p.oid >= 16384 "
-                                                               "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
-
-               /* Operators */
-                                                               "UNION ALL "
-                                                               "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
-                                                               "FROM pg_operator AS op "
-                                                               "WHERE op.oid >= 16384 "
-                                                               "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
-                                                               "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[]);",
-                                                               old_polymorphics.data,
-                                                               old_polymorphics.data,
-                                                               old_polymorphics.data);
-
-               ntups = PQntuples(res);
-
-               i_objkind = PQfnumber(res, "objkind");
-               i_objname = PQfnumber(res, "objname");
-
-               for (int rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL &&
-                               (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", output_path);
-                       if (!db_used)
-                       {
-                               fprintf(script, "In database: %s\n", active_db->db_name);
-                               db_used = true;
-                       }
-
-                       fprintf(script, "  %s: %s\n",
-                                       PQgetvalue(res, rowno, i_objkind),
-                                       PQgetvalue(res, rowno, i_objname));
-               }
+       /*
+        * The query below hardcodes FirstNormalObjectId as 16384 rather than
+        * interpolating that C #define into the query because, if that #define is
+        * ever changed, the cutoff we want to use is the value used by
+        * pre-version 14 servers, not that of some future version.
+        */
 
-               PQclear(res);
-               PQfinish(conn);
-       }
+       /* Aggregate transition functions */
+       query = psprintf("SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS transfn ON transfn.oid=a.aggtransfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggtransfn = ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Aggregate final functions */
+                                        "UNION ALL "
+                                        "SELECT 'aggregate' AS objkind, p.oid::regprocedure::text AS objname "
+                                        "FROM pg_proc AS p "
+                                        "JOIN pg_aggregate AS a ON a.aggfnoid=p.oid "
+                                        "JOIN pg_proc AS finalfn ON finalfn.oid=a.aggfinalfn "
+                                        "WHERE p.oid >= 16384 "
+                                        "AND a.aggfinalfn = ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND a.aggtranstype = ANY(ARRAY['anyarray', 'anyelement']::regtype[]) "
+
+       /* Operators */
+                                        "UNION ALL "
+                                        "SELECT 'operator' AS objkind, op.oid::regoperator::text AS objname "
+                                        "FROM pg_operator AS op "
+                                        "WHERE op.oid >= 16384 "
+                                        "AND oprcode = ANY(ARRAY[%s]::regprocedure[]) "
+                                        "AND oprleft = ANY(ARRAY['anyarray', 'anyelement']::regtype[])",
+                                        old_polymorphics.data,
+                                        old_polymorphics.data,
+                                        old_polymorphics.data);
+
+       upgrade_task_add_step(task, query, process_incompat_polymorphics,
+                                                 true, &report);
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-       if (script)
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                pg_log(PG_REPORT, "fatal");
                pg_fatal("Your installation contains user-defined objects that refer to internal\n"
                                 "polymorphic functions with arguments of type \"anyarray\" or \"anyelement\".\n"
@@ -1534,12 +1540,13 @@ check_for_incompatible_polymorphics(ClusterInfo *cluster)
                                 "afterwards, changing them to refer to the new corresponding functions with\n"
                                 "arguments of type \"anycompatiblearray\" and \"anycompatible\".\n"
                                 "A list of the problematic objects is in the file:\n"
-                                "    %s", output_path);
+                                "    %s", report.path);
        }
        else
                check_ok();
 
        termPQExpBuffer(&old_polymorphics);
+       pg_free(query);
 }
 
 /*