worker_spi: Expand set of options to start workers
authorMichael Paquier <michael@paquier.xyz>
Thu, 5 Oct 2023 03:22:28 +0000 (12:22 +0900)
committerMichael Paquier <michael@paquier.xyz>
Thu, 5 Oct 2023 03:22:28 +0000 (12:22 +0900)
A couple of new options are added to this module to provide more control
on the ways bgworkers are started:
- A new GUC called worker_spi.role to control which role to use by
default when starting a worker.
- worker_spi_launch() gains three arguments: a role OID, a database OID
and flags (currently only BGWORKER_BYPASS_ALLOWCONN).  By default, the
role OID and the database OID are InvalidOid, in which case the worker
would use the related GUCs.

Workers loaded by shared_preload_libraries use the default values
provided by the GUCs, with flags at 0.  The options are given to the
main bgworker routine through bgw_extra.  A test case is tweaked to
start two dynamic workers with databases and roles defined by the caller
of worker_spi_launch().

These additions will have the advantage of expanding the tests for
bgworkers, for at least two cases:
- BGWORKER_BYPASS_ALLOWCONN has no coverage in the core tree.
- A new bgworker flag is under discussion, and this eases the
integration of new tests.

Reviewed-by: Bertrand Drouvot
Discussion: https://postgr.es/m/bcc36259-7850-4882-97ef-d6b905d2fc51@gmail.com

src/test/modules/worker_spi/t/001_worker_spi.pl
src/test/modules/worker_spi/worker_spi--1.0.sql
src/test/modules/worker_spi/worker_spi.c

index f3ef3955540f5e7a51844f2fd84d3b9ca6aa1237..4b46b1336b1076fc641a8e35bc4b2092bcaa1b1b 100644 (file)
@@ -20,7 +20,9 @@ $node->safe_psql('postgres', 'CREATE EXTENSION worker_spi;');
 # This consists in making sure that a table name "counted" is created
 # on a new schema whose name includes the index defined in input argument
 # of worker_spi_launch().
-# By default, dynamic bgworkers connect to the "postgres" database.
+# By default, dynamic bgworkers connect to the "postgres" database with
+# an undefined role, falling back to the GUC defaults (or InvalidOid for
+# worker_spi_launch).
 my $result =
   $node->safe_psql('postgres', 'SELECT worker_spi_launch(4) IS NOT NULL;');
 is($result, 't', "dynamic bgworker launched");
@@ -44,8 +46,7 @@ $result = $node->poll_query_until(
        'postgres',
        qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
        qq[WorkerSpiMain]);
-is($result, 1,
-       'dynamic bgworker has reported "WorkerSpiMain" as wait event');
+is($result, 1, 'dynamic bgworker has reported "WorkerSpiMain" as wait event');
 
 # Check the wait event used by the dynamic bgworker appears in pg_wait_events
 $result = $node->safe_psql('postgres',
@@ -58,6 +59,7 @@ note "testing bgworkers loaded with shared_preload_libraries";
 # Create the database first so as the workers can connect to it when
 # the library is loaded.
 $node->safe_psql('postgres', q(CREATE DATABASE mydb;));
+$node->safe_psql('postgres', q(CREATE ROLE myrole SUPERUSER LOGIN;));
 $node->safe_psql('mydb', 'CREATE EXTENSION worker_spi;');
 
 # Now load the module as a shared library.
@@ -80,16 +82,25 @@ ok( $node->poll_query_until(
 
 # Ask worker_spi to launch dynamic bgworkers with the library loaded, then
 # check their existence.  Use IDs that do not overlap with the schemas created
-# by the previous workers.
-my $worker1_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(10);');
-my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);');
+# by the previous workers.  These ones use a new role, on different databases.
+my $myrole_id = $node->safe_psql('mydb',
+       "SELECT oid FROM pg_roles where rolname = 'myrole';");
+my $mydb_id = $node->safe_psql('mydb',
+       "SELECT oid FROM pg_database where datname = 'mydb';");
+my $postgresdb_id = $node->safe_psql('mydb',
+       "SELECT oid FROM pg_database where datname = 'postgres';");
+my $worker1_pid = $node->safe_psql('mydb',
+       "SELECT worker_spi_launch(10, $mydb_id, $myrole_id);");
+my $worker2_pid = $node->safe_psql('mydb',
+       "SELECT worker_spi_launch(11, $postgresdb_id, $myrole_id);");
 
 ok( $node->poll_query_until(
                'mydb',
-               qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
+               qq[SELECT datname, usename, wait_event FROM pg_stat_activity
             WHERE backend_type = 'worker_spi dynamic' AND
-            pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;],
-               'mydb|2|WorkerSpiMain'),
+            pid IN ($worker1_pid, $worker2_pid) ORDER BY datname;],
+               qq[mydb|myrole|WorkerSpiMain
+postgres|myrole|WorkerSpiMain]),
        'dynamic bgworkers all launched'
 ) or die "Timed out while waiting for dynamic bgworkers to be launched";
 
index e9d5b07373a8bdaeaab083758b693e7b93792c08..84deb6199f6352cbc26f56a8f9439372c9fbaee1 100644 (file)
@@ -3,7 +3,11 @@
 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
 \echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
 
-CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
+-- In the default case, dboid and roleoid fall back to their respective GUCs.
+CREATE FUNCTION worker_spi_launch(index int4,
+  dboid oid DEFAULT 0,
+  roleoid oid DEFAULT 0,
+  flags text[] DEFAULT '{}')
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME'
 LANGUAGE C;
index 2e3114990e40668b02cee1c386c31e18e034342c..5d81cf45639a1dce0939331b3631616c95386338 100644 (file)
 
 /* these headers are used by this particular worker's code */
 #include "access/xact.h"
+#include "commands/dbcommands.h"
 #include "executor/spi.h"
 #include "fmgr.h"
 #include "lib/stringinfo.h"
 #include "pgstat.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
 #include "tcop/utility.h"
@@ -52,6 +54,7 @@ PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
 static int     worker_spi_naptime = 10;
 static int     worker_spi_total_workers = 2;
 static char *worker_spi_database = NULL;
+static char *worker_spi_role = NULL;
 
 /* value cached, fetched from shared memory */
 static uint32 worker_spi_wait_event_main = 0;
@@ -138,12 +141,24 @@ worker_spi_main(Datum main_arg)
        worktable  *table;
        StringInfoData buf;
        char            name[20];
+       Oid                     dboid;
+       Oid                     roleoid;
+       char       *p;
+       bits32          flags = 0;
 
        table = palloc(sizeof(worktable));
        sprintf(name, "schema%d", index);
        table->schema = pstrdup(name);
        table->name = pstrdup("counted");
 
+       /* fetch database and role OIDs, these are set for a dynamic worker */
+       p = MyBgworkerEntry->bgw_extra;
+       memcpy(&dboid, p, sizeof(Oid));
+       p += sizeof(Oid);
+       memcpy(&roleoid, p, sizeof(Oid));
+       p += sizeof(Oid);
+       memcpy(&flags, p, sizeof(bits32));
+
        /* Establish signal handlers before unblocking signals. */
        pqsignal(SIGHUP, SignalHandlerForConfigReload);
        pqsignal(SIGTERM, die);
@@ -152,7 +167,11 @@ worker_spi_main(Datum main_arg)
        BackgroundWorkerUnblockSignals();
 
        /* Connect to our database */
-       BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
+       if (OidIsValid(dboid))
+               BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
+       else
+               BackgroundWorkerInitializeConnection(worker_spi_database,
+                                                                                        worker_spi_role, flags);
 
        elog(LOG, "%s initialized with %s.%s",
                 MyBgworkerEntry->bgw_name, table->schema, table->name);
@@ -316,6 +335,15 @@ _PG_init(void)
                                                           0,
                                                           NULL, NULL, NULL);
 
+       DefineCustomStringVariable("worker_spi.role",
+                                                          "Role to connect with.",
+                                                          NULL,
+                                                          &worker_spi_role,
+                                                          NULL,
+                                                          PGC_SIGHUP,
+                                                          0,
+                                                          NULL, NULL, NULL);
+
        if (!process_shared_preload_libraries_in_progress)
                return;
 
@@ -346,6 +374,10 @@ _PG_init(void)
 
        /*
         * Now fill in worker-specific data, and do the actual registrations.
+        *
+        * bgw_extra can optionally include a dabatase OID, a role OID and a set
+        * of flags.  This is left empty here to fallback to the related GUCs at
+        * startup (0 for the bgworker flags).
         */
        for (int i = 1; i <= worker_spi_total_workers; i++)
        {
@@ -364,10 +396,18 @@ Datum
 worker_spi_launch(PG_FUNCTION_ARGS)
 {
        int32           i = PG_GETARG_INT32(0);
+       Oid                     dboid = PG_GETARG_OID(1);
+       Oid                     roleoid = PG_GETARG_OID(2);
        BackgroundWorker worker;
        BackgroundWorkerHandle *handle;
        BgwHandleStatus status;
        pid_t           pid;
+       char       *p;
+       bits32          flags = 0;
+       ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
+       Size            ndim;
+       int                     nelems;
+       Datum      *datum_flags;
 
        memset(&worker, 0, sizeof(worker));
        worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -382,6 +422,54 @@ worker_spi_launch(PG_FUNCTION_ARGS)
        /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
        worker.bgw_notify_pid = MyProcPid;
 
+       /* extract flags, if any */
+       ndim = ARR_NDIM(arr);
+       if (ndim > 1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("flags array must be one-dimensional")));
+
+       if (array_contains_nulls(arr))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("flags array must not contain nulls")));
+
+       Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+       deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
+
+       for (i = 0; i < nelems; i++)
+       {
+               char       *optname = TextDatumGetCString(datum_flags[i]);
+
+               if (strcmp(optname, "ALLOWCONN") == 0)
+                       flags |= BGWORKER_BYPASS_ALLOWCONN;
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("incorrect flag value found in array")));
+       }
+
+       /*
+        * Register database and role to use for the worker started in bgw_extra.
+        * If none have been provided, this will fall back to the GUCs at startup.
+        */
+       if (!OidIsValid(dboid))
+               dboid = get_database_oid(worker_spi_database, false);
+
+       /*
+        * worker_spi_role is NULL by default, so this gives to worker_spi_main()
+        * an invalid OID in this case.
+        */
+       if (!OidIsValid(roleoid) && worker_spi_role)
+               roleoid = get_role_oid(worker_spi_role, false);
+
+       p = worker.bgw_extra;
+       memcpy(p, &dboid, sizeof(Oid));
+       p += sizeof(Oid);
+       memcpy(p, &roleoid, sizeof(Oid));
+       p += sizeof(Oid);
+       memcpy(p, &flags, sizeof(bits32));
+
        if (!RegisterDynamicBackgroundWorker(&worker, &handle))
                PG_RETURN_NULL();