Refactor function parse_subscription_options.
authorAmit Kapila <akapila@postgresql.org>
Tue, 6 Jul 2021 02:16:50 +0000 (07:46 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 6 Jul 2021 02:16:50 +0000 (07:46 +0530)
Instead of using multiple parameters in parse_subscription_options
function signature, use the struct SubOpts that encapsulate all the
subscription options and their values. It will be useful for future work
where we need to add other options in the subscription. Also, use bitmaps
to pass the supported and retrieve the specified options much like the way
it is done in the commit a3dc926009.

Author: Bharath Rupireddy
Reviewed-By: Peter Smith, Amit Kapila, Alvaro Herrera
Discussion: https://postgr.es/m/CALj2ACXtoQczfNsDQWobypVvHbX2DtgEHn8DawS0eGFwuo72kw@mail.gmail.com

src/backend/commands/subscriptioncmds.c
src/tools/pgindent/typedefs.list

index b862e59f1da8a42aa73d5a16ef27dd403b1dba97..eb88d877a5032dabd6f90a6ea5f04147650a2eac 100644 (file)
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+/*
+ * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
+ * command.
+ */
+#define SUBOPT_CONNECT             0x00000001
+#define SUBOPT_ENABLED             0x00000002
+#define SUBOPT_CREATE_SLOT         0x00000004
+#define SUBOPT_SLOT_NAME           0x00000008
+#define SUBOPT_COPY_DATA           0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT  0x00000020
+#define SUBOPT_REFRESH             0x00000040
+#define SUBOPT_BINARY              0x00000080
+#define SUBOPT_STREAMING           0x00000100
+
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits)  (((val) & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+   bits32      specified_opts;
+   char       *slot_name;
+   char       *synchronous_commit;
+   bool        connect;
+   bool        enabled;
+   bool        create_slot;
+   bool        copy_data;
+   bool        refresh;
+   bool        binary;
+   bool        streaming;
+} SubOpts;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
@@ -56,164 +91,151 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
  * Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error if mutually exclusive options are specified.
+ *
+ * Caller is expected to have cleared 'opts'.
  */
 static void
-parse_subscription_options(List *options,
-                          bool *connect,
-                          bool *enabled_given, bool *enabled,
-                          bool *create_slot,
-                          bool *slot_name_given, char **slot_name,
-                          bool *copy_data,
-                          char **synchronous_commit,
-                          bool *refresh,
-                          bool *binary_given, bool *binary,
-                          bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
 {
    ListCell   *lc;
-   bool        connect_given = false;
-   bool        create_slot_given = false;
-   bool        copy_data_given = false;
-   bool        refresh_given = false;
-
-   /* If connect is specified, the others also need to be. */
-   Assert(!connect || (enabled && create_slot && copy_data));
 
-   if (connect)
-       *connect = true;
-   if (enabled)
-   {
-       *enabled_given = false;
-       *enabled = true;
-   }
-   if (create_slot)
-       *create_slot = true;
-   if (slot_name)
-   {
-       *slot_name_given = false;
-       *slot_name = NULL;
-   }
-   if (copy_data)
-       *copy_data = true;
-   if (synchronous_commit)
-       *synchronous_commit = NULL;
-   if (refresh)
-       *refresh = true;
-   if (binary)
-   {
-       *binary_given = false;
-       *binary = false;
-   }
-   if (streaming)
-   {
-       *streaming_given = false;
-       *streaming = false;
-   }
+   /* caller must expect some option */
+   Assert(supported_opts != 0);
+
+   /* If connect option is supported, these others also need to be. */
+   Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
+          IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+                SUBOPT_COPY_DATA));
+
+   /* Set default values for the boolean supported options. */
+   if (IsSet(supported_opts, SUBOPT_CONNECT))
+       opts->connect = true;
+   if (IsSet(supported_opts, SUBOPT_ENABLED))
+       opts->enabled = true;
+   if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
+       opts->create_slot = true;
+   if (IsSet(supported_opts, SUBOPT_COPY_DATA))
+       opts->copy_data = true;
+   if (IsSet(supported_opts, SUBOPT_REFRESH))
+       opts->refresh = true;
+   if (IsSet(supported_opts, SUBOPT_BINARY))
+       opts->binary = false;
+   if (IsSet(supported_opts, SUBOPT_STREAMING))
+       opts->streaming = false;
 
    /* Parse options */
-   foreach(lc, options)
+   foreach(lc, stmt_options)
    {
        DefElem    *defel = (DefElem *) lfirst(lc);
 
-       if (strcmp(defel->defname, "connect") == 0 && connect)
+       if (IsSet(supported_opts, SUBOPT_CONNECT) &&
+           strcmp(defel->defname, "connect") == 0)
        {
-           if (connect_given)
+           if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           connect_given = true;
-           *connect = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_CONNECT;
+           opts->connect = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+       else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
+                strcmp(defel->defname, "enabled") == 0)
        {
-           if (*enabled_given)
+           if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           *enabled_given = true;
-           *enabled = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_ENABLED;
+           opts->enabled = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+       else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+                strcmp(defel->defname, "create_slot") == 0)
        {
-           if (create_slot_given)
+           if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           create_slot_given = true;
-           *create_slot = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_CREATE_SLOT;
+           opts->create_slot = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+       else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+                strcmp(defel->defname, "slot_name") == 0)
        {
-           if (*slot_name_given)
+           if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           *slot_name_given = true;
-           *slot_name = defGetString(defel);
+           opts->specified_opts |= SUBOPT_SLOT_NAME;
+           opts->slot_name = defGetString(defel);
 
            /* Setting slot_name = NONE is treated as no slot name. */
-           if (strcmp(*slot_name, "none") == 0)
-               *slot_name = NULL;
+           if (strcmp(opts->slot_name, "none") == 0)
+               opts->slot_name = NULL;
        }
-       else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+       else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+                strcmp(defel->defname, "copy_data") == 0)
        {
-           if (copy_data_given)
+           if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           copy_data_given = true;
-           *copy_data = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_COPY_DATA;
+           opts->copy_data = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
-                synchronous_commit)
+       else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+                strcmp(defel->defname, "synchronous_commit") == 0)
        {
-           if (*synchronous_commit)
+           if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           *synchronous_commit = defGetString(defel);
+           opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+           opts->synchronous_commit = defGetString(defel);
 
            /* Test if the given value is valid for synchronous_commit GUC. */
-           (void) set_config_option("synchronous_commit", *synchronous_commit,
+           (void) set_config_option("synchronous_commit", opts->synchronous_commit,
                                     PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
                                     false, 0, false);
        }
-       else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+       else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
+                strcmp(defel->defname, "refresh") == 0)
        {
-           if (refresh_given)
+           if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           refresh_given = true;
-           *refresh = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_REFRESH;
+           opts->refresh = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "binary") == 0 && binary)
+       else if (IsSet(supported_opts, SUBOPT_BINARY) &&
+                strcmp(defel->defname, "binary") == 0)
        {
-           if (*binary_given)
+           if (IsSet(opts->specified_opts, SUBOPT_BINARY))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           *binary_given = true;
-           *binary = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_BINARY;
+           opts->binary = defGetBoolean(defel);
        }
-       else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+       else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
+                strcmp(defel->defname, "streaming") == 0)
        {
-           if (*streaming_given)
+           if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
                ereport(ERROR,
                        (errcode(ERRCODE_SYNTAX_ERROR),
                         errmsg("conflicting or redundant options")));
 
-           *streaming_given = true;
-           *streaming = defGetBoolean(defel);
+           opts->specified_opts |= SUBOPT_STREAMING;
+           opts->streaming = defGetBoolean(defel);
        }
        else
            ereport(ERROR,
@@ -225,63 +247,81 @@ parse_subscription_options(List *options,
     * We've been explicitly asked to not connect, that requires some
     * additional processing.
     */
-   if (connect && !*connect)
+   if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
    {
        /* Check for incompatible options from the user. */
-       if (enabled && *enabled_given && *enabled)
+       if (opts->enabled &&
+           IsSet(supported_opts, SUBOPT_ENABLED) &&
+           IsSet(opts->specified_opts, SUBOPT_ENABLED))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
            /*- translator: both %s are strings of the form "option = value" */
                     errmsg("%s and %s are mutually exclusive options",
                            "connect = false", "enabled = true")));
 
-       if (create_slot && create_slot_given && *create_slot)
+       if (opts->create_slot &&
+           IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+           IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
                     errmsg("%s and %s are mutually exclusive options",
                            "connect = false", "create_slot = true")));
 
-       if (copy_data && copy_data_given && *copy_data)
+       if (opts->copy_data &&
+           IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+           IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
                     errmsg("%s and %s are mutually exclusive options",
                            "connect = false", "copy_data = true")));
 
        /* Change the defaults of other options. */
-       *enabled = false;
-       *create_slot = false;
-       *copy_data = false;
+       opts->enabled = false;
+       opts->create_slot = false;
+       opts->copy_data = false;
    }
 
    /*
     * Do additional checking for disallowed combination when slot_name = NONE
     * was used.
     */
-   if (slot_name && *slot_name_given && !*slot_name)
+   if (!opts->slot_name &&
+       IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+       IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
    {
-       if (enabled && *enabled_given && *enabled)
+       if (opts->enabled &&
+           IsSet(supported_opts, SUBOPT_ENABLED) &&
+           IsSet(opts->specified_opts, SUBOPT_ENABLED))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
            /*- translator: both %s are strings of the form "option = value" */
                     errmsg("%s and %s are mutually exclusive options",
                            "slot_name = NONE", "enabled = true")));
 
-       if (create_slot && create_slot_given && *create_slot)
+       if (opts->create_slot &&
+           IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+           IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
+           /*- translator: both %s are strings of the form "option = value" */
                     errmsg("%s and %s are mutually exclusive options",
                            "slot_name = NONE", "create_slot = true")));
 
-       if (enabled && !*enabled_given && *enabled)
+       if (opts->enabled &&
+           IsSet(supported_opts, SUBOPT_ENABLED) &&
+           !IsSet(opts->specified_opts, SUBOPT_ENABLED))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
            /*- translator: both %s are strings of the form "option = value" */
                     errmsg("subscription with %s must also set %s",
                            "slot_name = NONE", "enabled = false")));
 
-       if (create_slot && !create_slot_given && *create_slot)
+       if (opts->create_slot &&
+           IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+           !IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
+           /*- translator: both %s are strings of the form "option = value" */
                     errmsg("subscription with %s must also set %s",
                            "slot_name = NONE", "create_slot = false")));
    }
@@ -331,37 +371,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    Datum       values[Natts_pg_subscription];
    Oid         owner = GetUserId();
    HeapTuple   tup;
-   bool        connect;
-   bool        enabled_given;
-   bool        enabled;
-   bool        copy_data;
-   bool        streaming;
-   bool        streaming_given;
-   char       *synchronous_commit;
    char       *conninfo;
-   char       *slotname;
-   bool        slotname_given;
-   bool        binary;
-   bool        binary_given;
    char        originname[NAMEDATALEN];
-   bool        create_slot;
    List       *publications;
+   bits32      supported_opts;
+   SubOpts     opts = {0};
 
    /*
     * Parse and check options.
     *
     * Connection and publication should not be specified here.
     */
-   parse_subscription_options(stmt->options,
-                              &connect,
-                              &enabled_given, &enabled,
-                              &create_slot,
-                              &slotname_given, &slotname,
-                              &copy_data,
-                              &synchronous_commit,
-                              NULL,    /* no "refresh" */
-                              &binary_given, &binary,
-                              &streaming_given, &streaming);
+   supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+                     SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+                     SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+                     SUBOPT_STREAMING);
+   parse_subscription_options(stmt->options, supported_opts, &opts);
 
    /*
     * Since creating a replication slot is not transactional, rolling back
@@ -369,7 +394,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     * CREATE SUBSCRIPTION inside a transaction block if creating a
     * replication slot.
     */
-   if (create_slot)
+   if (opts.create_slot)
        PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
    if (!superuser())
@@ -399,12 +424,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                        stmt->subname)));
    }
 
-   if (!slotname_given && slotname == NULL)
-       slotname = stmt->subname;
+   if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
+       opts.slot_name == NULL)
+       opts.slot_name = stmt->subname;
 
    /* The default for synchronous_commit of subscriptions is off. */
-   if (synchronous_commit == NULL)
-       synchronous_commit = "off";
+   if (opts.synchronous_commit == NULL)
+       opts.synchronous_commit = "off";
 
    conninfo = stmt->conninfo;
    publications = stmt->publication;
@@ -426,18 +452,18 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    values[Anum_pg_subscription_subname - 1] =
        DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
    values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
-   values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
-   values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
-   values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+   values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+   values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
+   values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
    values[Anum_pg_subscription_subconninfo - 1] =
        CStringGetTextDatum(conninfo);
-   if (slotname)
+   if (opts.slot_name)
        values[Anum_pg_subscription_subslotname - 1] =
-           DirectFunctionCall1(namein, CStringGetDatum(slotname));
+           DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
    else
        nulls[Anum_pg_subscription_subslotname - 1] = true;
    values[Anum_pg_subscription_subsynccommit - 1] =
-       CStringGetTextDatum(synchronous_commit);
+       CStringGetTextDatum(opts.synchronous_commit);
    values[Anum_pg_subscription_subpublications - 1] =
        publicationListToArray(publications);
 
@@ -456,7 +482,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
     * Connect to remote side to execute requested commands and fetch table
     * info.
     */
-   if (connect)
+   if (opts.connect)
    {
        char       *err;
        WalReceiverConn *wrconn;
@@ -477,7 +503,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
             * Set sync state based on if we were asked to do data copy or
             * not.
             */
-           table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+           table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
            /*
             * Get the table list from publisher and build local table status
@@ -504,15 +530,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
             * won't use the initial snapshot for anything, so no need to
             * export it.
             */
-           if (create_slot)
+           if (opts.create_slot)
            {
-               Assert(slotname);
+               Assert(opts.slot_name);
 
-               walrcv_create_slot(wrconn, slotname, false,
+               walrcv_create_slot(wrconn, opts.slot_name, false,
                                   CRS_NOEXPORT_SNAPSHOT, NULL);
                ereport(NOTICE,
                        (errmsg("created replication slot \"%s\" on publisher",
-                               slotname)));
+                               opts.slot_name)));
            }
        }
        PG_FINALLY();
@@ -529,7 +555,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
    table_close(rel, RowExclusiveLock);
 
-   if (enabled)
+   if (opts.enabled)
        ApplyLauncherWakeupAtCommit();
 
    ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -764,6 +790,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
    bool        update_tuple = false;
    Subscription *sub;
    Form_pg_subscription form;
+   bits32      supported_opts;
+   SubOpts     opts = {0};
 
    rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -799,59 +827,46 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
    {
        case ALTER_SUBSCRIPTION_OPTIONS:
            {
-               char       *slotname;
-               bool        slotname_given;
-               char       *synchronous_commit;
-               bool        binary_given;
-               bool        binary;
-               bool        streaming_given;
-               bool        streaming;
-
-               parse_subscription_options(stmt->options,
-                                          NULL,    /* no "connect" */
-                                          NULL, NULL,  /* no "enabled" */
-                                          NULL,    /* no "create_slot" */
-                                          &slotname_given, &slotname,
-                                          NULL,    /* no "copy_data" */
-                                          &synchronous_commit,
-                                          NULL,    /* no "refresh" */
-                                          &binary_given, &binary,
-                                          &streaming_given, &streaming);
-
-               if (slotname_given)
+               supported_opts = (SUBOPT_SLOT_NAME |
+                                 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+                                 SUBOPT_STREAMING);
+
+               parse_subscription_options(stmt->options, supported_opts, &opts);
+
+               if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
                {
-                   if (sub->enabled && !slotname)
+                   if (sub->enabled && !opts.slot_name)
                        ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("cannot set %s for enabled subscription",
                                        "slot_name = NONE")));
 
-                   if (slotname)
+                   if (opts.slot_name)
                        values[Anum_pg_subscription_subslotname - 1] =
-                           DirectFunctionCall1(namein, CStringGetDatum(slotname));
+                           DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
                    else
                        nulls[Anum_pg_subscription_subslotname - 1] = true;
                    replaces[Anum_pg_subscription_subslotname - 1] = true;
                }
 
-               if (synchronous_commit)
+               if (opts.synchronous_commit)
                {
                    values[Anum_pg_subscription_subsynccommit - 1] =
-                       CStringGetTextDatum(synchronous_commit);
+                       CStringGetTextDatum(opts.synchronous_commit);
                    replaces[Anum_pg_subscription_subsynccommit - 1] = true;
                }
 
-               if (binary_given)
+               if (IsSet(opts.specified_opts, SUBOPT_BINARY))
                {
                    values[Anum_pg_subscription_subbinary - 1] =
-                       BoolGetDatum(binary);
+                       BoolGetDatum(opts.binary);
                    replaces[Anum_pg_subscription_subbinary - 1] = true;
                }
 
-               if (streaming_given)
+               if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
                {
                    values[Anum_pg_subscription_substream - 1] =
-                       BoolGetDatum(streaming);
+                       BoolGetDatum(opts.streaming);
                    replaces[Anum_pg_subscription_substream - 1] = true;
                }
 
@@ -861,31 +876,19 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
        case ALTER_SUBSCRIPTION_ENABLED:
            {
-               bool        enabled,
-                           enabled_given;
-
-               parse_subscription_options(stmt->options,
-                                          NULL,    /* no "connect" */
-                                          &enabled_given, &enabled,
-                                          NULL,    /* no "create_slot" */
-                                          NULL, NULL,  /* no "slot_name" */
-                                          NULL,    /* no "copy_data" */
-                                          NULL,    /* no "synchronous_commit" */
-                                          NULL,    /* no "refresh" */
-                                          NULL, NULL,  /* no "binary" */
-                                          NULL, NULL); /* no streaming */
-               Assert(enabled_given);
-
-               if (!sub->slotname && enabled)
+               parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
+               Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
+
+               if (!sub->slotname && opts.enabled)
                    ereport(ERROR,
                            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                             errmsg("cannot enable subscription that does not have a slot name")));
 
                values[Anum_pg_subscription_subenabled - 1] =
-                   BoolGetDatum(enabled);
+                   BoolGetDatum(opts.enabled);
                replaces[Anum_pg_subscription_subenabled - 1] = true;
 
-               if (enabled)
+               if (opts.enabled)
                    ApplyLauncherWakeupAtCommit();
 
                update_tuple = true;
@@ -906,19 +909,9 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
        case ALTER_SUBSCRIPTION_SET_PUBLICATION:
            {
-               bool        copy_data;
-               bool        refresh;
-
-               parse_subscription_options(stmt->options,
-                                          NULL,    /* no "connect" */
-                                          NULL, NULL,  /* no "enabled" */
-                                          NULL,    /* no "create_slot" */
-                                          NULL, NULL,  /* no "slot_name" */
-                                          &copy_data,
-                                          NULL,    /* no "synchronous_commit" */
-                                          &refresh,
-                                          NULL, NULL,  /* no "binary" */
-                                          NULL, NULL); /* no "streaming" */
+               supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+               parse_subscription_options(stmt->options, supported_opts, &opts);
+
                values[Anum_pg_subscription_subpublications - 1] =
                    publicationListToArray(stmt->publication);
                replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -926,7 +919,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                update_tuple = true;
 
                /* Refresh if user asked us to. */
-               if (refresh)
+               if (opts.refresh)
                {
                    if (!sub->enabled)
                        ereport(ERROR,
@@ -939,7 +932,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                    /* Make sure refresh sees the new list of publications. */
                    sub->publications = stmt->publication;
 
-                   AlterSubscription_refresh(sub, copy_data);
+                   AlterSubscription_refresh(sub, opts.copy_data);
                }
 
                break;
@@ -948,25 +941,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
        case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
        case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
            {
-               bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
-               bool        copy_data = false;
-               bool        refresh;
                List       *publist;
+               bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-               parse_subscription_options(stmt->options,
-                                          NULL,    /* no "connect" */
-                                          NULL, NULL,  /* no "enabled" */
-                                          NULL,    /* no "create_slot" */
-                                          NULL, NULL,  /* no "slot_name" */
-                                          isadd ? &copy_data : NULL,   /* for drop, no
-                                                                        * "copy_data" */
-                                          NULL,    /* no "synchronous_commit" */
-                                          &refresh,
-                                          NULL, NULL,  /* no "binary" */
-                                          NULL, NULL); /* no "streaming" */
+               supported_opts = SUBOPT_REFRESH;
+               if (isadd)
+                   supported_opts |= SUBOPT_COPY_DATA;
 
-               publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+               parse_subscription_options(stmt->options, supported_opts, &opts);
 
+               publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
                values[Anum_pg_subscription_subpublications - 1] =
                    publicationListToArray(publist);
                replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -974,7 +958,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                update_tuple = true;
 
                /* Refresh if user asked us to. */
-               if (refresh)
+               if (opts.refresh)
                {
                    if (!sub->enabled)
                        ereport(ERROR,
@@ -987,7 +971,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                    /* Only refresh the added/dropped list of publications. */
                    sub->publications = stmt->publication;
 
-                   AlterSubscription_refresh(sub, copy_data);
+                   AlterSubscription_refresh(sub, opts.copy_data);
                }
 
                break;
@@ -995,27 +979,16 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 
        case ALTER_SUBSCRIPTION_REFRESH:
            {
-               bool        copy_data;
-
                if (!sub->enabled)
                    ereport(ERROR,
                            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                             errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
-               parse_subscription_options(stmt->options,
-                                          NULL,    /* no "connect" */
-                                          NULL, NULL,  /* no "enabled" */
-                                          NULL,    /* no "create_slot" */
-                                          NULL, NULL,  /* no "slot_name" */
-                                          &copy_data,
-                                          NULL,    /* no "synchronous_commit" */
-                                          NULL,    /* no "refresh" */
-                                          NULL, NULL,  /* no "binary" */
-                                          NULL, NULL); /* no "streaming" */
+               parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
 
                PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-               AlterSubscription_refresh(sub, copy_data);
+               AlterSubscription_refresh(sub, opts.copy_data);
 
                break;
            }
index 64c06cf95235951db5688b38338a71436cff0502..a72d53a272f1d3c552eb646dd9303f53da033f59 100644 (file)
@@ -2511,6 +2511,7 @@ StringInfoData
 StripnullState
 SubLink
 SubLinkType
+SubOpts
 SubPlan
 SubPlanState
 SubRemoveRels