pg_upgrade: Parallelize retrieving extension updates.
authorNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
committerNathan Bossart <nathan@postgresql.org>
Mon, 16 Sep 2024 21:10:33 +0000 (16:10 -0500)
This commit makes use of the new task framework in pg_upgrade to
parallelize retrieving the set of extensions that should be updated
with the ALTER EXTENSION command after upgrade.  This step will now
process multiple databases concurrently when pg_upgrade's --jobs
option is provided a value greater than 1.

Reviewed-by: Daniel Gustafsson, Ilya Gladyshev
Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13

src/bin/pg_upgrade/version.c

index 2de6dffccdab4d7ab039b7e3da46c3604e4c6615..5084b088057e9346b52dd7d7c664a6c81e2f9a0d 100644 (file)
@@ -139,6 +139,41 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
                check_ok();
 }
 
+/*
+ * Callback function for processing results of query for
+ * report_extension_updates()'s UpgradeTask.  If the query returned any rows,
+ * write the details to the report file.
+ */
+static void
+process_extension_updates(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+       bool            db_used = false;
+       int                     ntups = PQntuples(res);
+       int                     i_name = PQfnumber(res, "name");
+       UpgradeTaskReport *report = (UpgradeTaskReport *) arg;
+
+       AssertVariableIsOfType(&process_extension_updates, UpgradeTaskProcessCB);
+
+       for (int rowno = 0; rowno < ntups; rowno++)
+       {
+               if (report->file == NULL &&
+                       (report->file = fopen_priv(report->path, "w")) == NULL)
+                       pg_fatal("could not open file \"%s\": %m", report->path);
+               if (!db_used)
+               {
+                       PQExpBufferData connectbuf;
+
+                       initPQExpBuffer(&connectbuf);
+                       appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+                       fputs(connectbuf.data, report->file);
+                       termPQExpBuffer(&connectbuf);
+                       db_used = true;
+               }
+               fprintf(report->file, "ALTER EXTENSION %s UPDATE;\n",
+                               quote_identifier(PQgetvalue(res, rowno, i_name)));
+       }
+}
+
 /*
  * report_extension_updates()
  *     Report extensions that should be updated.
@@ -146,57 +181,26 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
 void
 report_extension_updates(ClusterInfo *cluster)
 {
-       int                     dbnum;
-       FILE       *script = NULL;
-       char       *output_path = "update_extensions.sql";
+       UpgradeTaskReport report;
+       UpgradeTask *task = upgrade_task_create();
+       const char *query = "SELECT name "
+               "FROM pg_available_extensions "
+               "WHERE installed_version != default_version";
 
        prep_status("Checking for extension updates");
 
-       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-       {
-               PGresult   *res;
-               bool            db_used = false;
-               int                     ntups;
-               int                     rowno;
-               int                     i_name;
-               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
-               PGconn     *conn = connectToServer(cluster, active_db->db_name);
-
-               /* find extensions needing updates */
-               res = executeQueryOrDie(conn,
-                                                               "SELECT name "
-                                                               "FROM pg_available_extensions "
-                                                               "WHERE installed_version != default_version"
-                       );
+       report.file = NULL;
+       strcpy(report.path, "update_extensions.sql");
 
-               ntups = PQntuples(res);
-               i_name = PQfnumber(res, "name");
-               for (rowno = 0; rowno < ntups; rowno++)
-               {
-                       if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-                               pg_fatal("could not open file \"%s\": %m", output_path);
-                       if (!db_used)
-                       {
-                               PQExpBufferData connectbuf;
+       upgrade_task_add_step(task, query, process_extension_updates,
+                                                 true, &report);
 
-                               initPQExpBuffer(&connectbuf);
-                               appendPsqlMetaConnect(&connectbuf, active_db->db_name);
-                               fputs(connectbuf.data, script);
-                               termPQExpBuffer(&connectbuf);
-                               db_used = true;
-                       }
-                       fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
-                                       quote_identifier(PQgetvalue(res, rowno, i_name)));
-               }
+       upgrade_task_run(task, cluster);
+       upgrade_task_free(task);
 
-               PQclear(res);
-
-               PQfinish(conn);
-       }
-
-       if (script)
+       if (report.file)
        {
-               fclose(script);
+               fclose(report.file);
                report_status(PG_REPORT, "notice");
                pg_log(PG_REPORT, "\n"
                           "Your installation contains extensions that should be updated\n"
@@ -204,7 +208,7 @@ report_extension_updates(ClusterInfo *cluster)
                           "    %s\n"
                           "when executed by psql by the database superuser will update\n"
                           "these extensions.",
-                          output_path);
+                          report.path);
        }
        else
                check_ok();