worker_spi: Expand set of options to start workers
authorMichael Paquier <michael@paquier.xyz>
Thu, 5 Oct 2023 03:22:28 +0000 (12:22 +0900)
committerMichael Paquier <michael@paquier.xyz>
Thu, 5 Oct 2023 03:22:28 +0000 (12:22 +0900)
A couple of new options are added to this module to provide more control
on the ways bgworkers are started:
- A new GUC called worker_spi.role to control which role to use by
default when starting a worker.
- worker_spi_launch() gains three arguments: a role OID, a database OID
and flags (currently only BGWORKER_BYPASS_ALLOWCONN).  By default, the
role OID and the database OID are InvalidOid, in which case the worker
would use the related GUCs.

Workers loaded by shared_preload_libraries use the default values
provided by the GUCs, with flags at 0.  The options are given to the
main bgworker routine through bgw_extra.  A test case is tweaked to
start two dynamic workers with databases and roles defined by the caller
of worker_spi_launch().

These additions will have the advantage of expanding the tests for
bgworkers, for at least two cases:
- BGWORKER_BYPASS_ALLOWCONN has no coverage in the core tree.
- A new bgworker flag is under discussion, and this eases the
integration of new tests.

Reviewed-by: Bertrand Drouvot
Discussion: https://postgr.es/m/bcc36259-7850-4882-97ef-d6b905d2fc51@gmail.com

src/test/modules/worker_spi/t/001_worker_spi.pl
src/test/modules/worker_spi/worker_spi--1.0.sql
src/test/modules/worker_spi/worker_spi.c

index f3ef3955540f5e7a51844f2fd84d3b9ca6aa1237..4b46b1336b1076fc641a8e35bc4b2092bcaa1b1b 100644 (file)
@@ -20,7 +20,9 @@ $node->safe_psql('postgres', 'CREATE EXTENSION worker_spi;');
 # This consists in making sure that a table name "counted" is created
 # on a new schema whose name includes the index defined in input argument
 # of worker_spi_launch().
-# By default, dynamic bgworkers connect to the "postgres" database.
+# By default, dynamic bgworkers connect to the "postgres" database with
+# an undefined role, falling back to the GUC defaults (or InvalidOid for
+# worker_spi_launch).
 my $result =
   $node->safe_psql('postgres', 'SELECT worker_spi_launch(4) IS NOT NULL;');
 is($result, 't', "dynamic bgworker launched");
@@ -44,8 +46,7 @@ $result = $node->poll_query_until(
    'postgres',
    qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
    qq[WorkerSpiMain]);
-is($result, 1,
-   'dynamic bgworker has reported "WorkerSpiMain" as wait event');
+is($result, 1, 'dynamic bgworker has reported "WorkerSpiMain" as wait event');
 
 # Check the wait event used by the dynamic bgworker appears in pg_wait_events
 $result = $node->safe_psql('postgres',
@@ -58,6 +59,7 @@ note "testing bgworkers loaded with shared_preload_libraries";
 # Create the database first so as the workers can connect to it when
 # the library is loaded.
 $node->safe_psql('postgres', q(CREATE DATABASE mydb;));
+$node->safe_psql('postgres', q(CREATE ROLE myrole SUPERUSER LOGIN;));
 $node->safe_psql('mydb', 'CREATE EXTENSION worker_spi;');
 
 # Now load the module as a shared library.
@@ -80,16 +82,25 @@ ok( $node->poll_query_until(
 
 # Ask worker_spi to launch dynamic bgworkers with the library loaded, then
 # check their existence.  Use IDs that do not overlap with the schemas created
-# by the previous workers.
-my $worker1_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(10);');
-my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);');
+# by the previous workers.  These ones use a new role, on different databases.
+my $myrole_id = $node->safe_psql('mydb',
+   "SELECT oid FROM pg_roles where rolname = 'myrole';");
+my $mydb_id = $node->safe_psql('mydb',
+   "SELECT oid FROM pg_database where datname = 'mydb';");
+my $postgresdb_id = $node->safe_psql('mydb',
+   "SELECT oid FROM pg_database where datname = 'postgres';");
+my $worker1_pid = $node->safe_psql('mydb',
+   "SELECT worker_spi_launch(10, $mydb_id, $myrole_id);");
+my $worker2_pid = $node->safe_psql('mydb',
+   "SELECT worker_spi_launch(11, $postgresdb_id, $myrole_id);");
 
 ok( $node->poll_query_until(
        'mydb',
-       qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
+       qq[SELECT datname, usename, wait_event FROM pg_stat_activity
             WHERE backend_type = 'worker_spi dynamic' AND
-            pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;],
-       'mydb|2|WorkerSpiMain'),
+            pid IN ($worker1_pid, $worker2_pid) ORDER BY datname;],
+       qq[mydb|myrole|WorkerSpiMain
+postgres|myrole|WorkerSpiMain]),
    'dynamic bgworkers all launched'
 ) or die "Timed out while waiting for dynamic bgworkers to be launched";
 
index e9d5b07373a8bdaeaab083758b693e7b93792c08..84deb6199f6352cbc26f56a8f9439372c9fbaee1 100644 (file)
@@ -3,7 +3,11 @@
 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
 \echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
 
-CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
+-- In the default case, dboid and roleoid fall back to their respective GUCs.
+CREATE FUNCTION worker_spi_launch(index int4,
+  dboid oid DEFAULT 0,
+  roleoid oid DEFAULT 0,
+  flags text[] DEFAULT '{}')
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME'
 LANGUAGE C;
index 2e3114990e40668b02cee1c386c31e18e034342c..5d81cf45639a1dce0939331b3631616c95386338 100644 (file)
 
 /* these headers are used by this particular worker's code */
 #include "access/xact.h"
+#include "commands/dbcommands.h"
 #include "executor/spi.h"
 #include "fmgr.h"
 #include "lib/stringinfo.h"
 #include "pgstat.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/snapmgr.h"
 #include "tcop/utility.h"
@@ -52,6 +54,7 @@ PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
 static int worker_spi_naptime = 10;
 static int worker_spi_total_workers = 2;
 static char *worker_spi_database = NULL;
+static char *worker_spi_role = NULL;
 
 /* value cached, fetched from shared memory */
 static uint32 worker_spi_wait_event_main = 0;
@@ -138,12 +141,24 @@ worker_spi_main(Datum main_arg)
    worktable  *table;
    StringInfoData buf;
    char        name[20];
+   Oid         dboid;
+   Oid         roleoid;
+   char       *p;
+   bits32      flags = 0;
 
    table = palloc(sizeof(worktable));
    sprintf(name, "schema%d", index);
    table->schema = pstrdup(name);
    table->name = pstrdup("counted");
 
+   /* fetch database and role OIDs, these are set for a dynamic worker */
+   p = MyBgworkerEntry->bgw_extra;
+   memcpy(&dboid, p, sizeof(Oid));
+   p += sizeof(Oid);
+   memcpy(&roleoid, p, sizeof(Oid));
+   p += sizeof(Oid);
+   memcpy(&flags, p, sizeof(bits32));
+
    /* Establish signal handlers before unblocking signals. */
    pqsignal(SIGHUP, SignalHandlerForConfigReload);
    pqsignal(SIGTERM, die);
@@ -152,7 +167,11 @@ worker_spi_main(Datum main_arg)
    BackgroundWorkerUnblockSignals();
 
    /* Connect to our database */
-   BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
+   if (OidIsValid(dboid))
+       BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
+   else
+       BackgroundWorkerInitializeConnection(worker_spi_database,
+                                            worker_spi_role, flags);
 
    elog(LOG, "%s initialized with %s.%s",
         MyBgworkerEntry->bgw_name, table->schema, table->name);
@@ -316,6 +335,15 @@ _PG_init(void)
                               0,
                               NULL, NULL, NULL);
 
+   DefineCustomStringVariable("worker_spi.role",
+                              "Role to connect with.",
+                              NULL,
+                              &worker_spi_role,
+                              NULL,
+                              PGC_SIGHUP,
+                              0,
+                              NULL, NULL, NULL);
+
    if (!process_shared_preload_libraries_in_progress)
        return;
 
@@ -346,6 +374,10 @@ _PG_init(void)
 
    /*
     * Now fill in worker-specific data, and do the actual registrations.
+    *
+    * bgw_extra can optionally include a dabatase OID, a role OID and a set
+    * of flags.  This is left empty here to fallback to the related GUCs at
+    * startup (0 for the bgworker flags).
     */
    for (int i = 1; i <= worker_spi_total_workers; i++)
    {
@@ -364,10 +396,18 @@ Datum
 worker_spi_launch(PG_FUNCTION_ARGS)
 {
    int32       i = PG_GETARG_INT32(0);
+   Oid         dboid = PG_GETARG_OID(1);
+   Oid         roleoid = PG_GETARG_OID(2);
    BackgroundWorker worker;
    BackgroundWorkerHandle *handle;
    BgwHandleStatus status;
    pid_t       pid;
+   char       *p;
+   bits32      flags = 0;
+   ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
+   Size        ndim;
+   int         nelems;
+   Datum      *datum_flags;
 
    memset(&worker, 0, sizeof(worker));
    worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -382,6 +422,54 @@ worker_spi_launch(PG_FUNCTION_ARGS)
    /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
    worker.bgw_notify_pid = MyProcPid;
 
+   /* extract flags, if any */
+   ndim = ARR_NDIM(arr);
+   if (ndim > 1)
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("flags array must be one-dimensional")));
+
+   if (array_contains_nulls(arr))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("flags array must not contain nulls")));
+
+   Assert(ARR_ELEMTYPE(arr) == TEXTOID);
+   deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
+
+   for (i = 0; i < nelems; i++)
+   {
+       char       *optname = TextDatumGetCString(datum_flags[i]);
+
+       if (strcmp(optname, "ALLOWCONN") == 0)
+           flags |= BGWORKER_BYPASS_ALLOWCONN;
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                    errmsg("incorrect flag value found in array")));
+   }
+
+   /*
+    * Register database and role to use for the worker started in bgw_extra.
+    * If none have been provided, this will fall back to the GUCs at startup.
+    */
+   if (!OidIsValid(dboid))
+       dboid = get_database_oid(worker_spi_database, false);
+
+   /*
+    * worker_spi_role is NULL by default, so this gives to worker_spi_main()
+    * an invalid OID in this case.
+    */
+   if (!OidIsValid(roleoid) && worker_spi_role)
+       roleoid = get_role_oid(worker_spi_role, false);
+
+   p = worker.bgw_extra;
+   memcpy(p, &dboid, sizeof(Oid));
+   p += sizeof(Oid);
+   memcpy(p, &roleoid, sizeof(Oid));
+   p += sizeof(Oid);
+   memcpy(p, &flags, sizeof(bits32));
+
    if (!RegisterDynamicBackgroundWorker(&worker, &handle))
        PG_RETURN_NULL();