#include "utils/memutils.h"
#include "utils/syscache.h"
+/*
+ * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
+ * command.
+ */
+#define SUBOPT_CONNECT 0x00000001
+#define SUBOPT_ENABLED 0x00000002
+#define SUBOPT_CREATE_SLOT 0x00000004
+#define SUBOPT_SLOT_NAME 0x00000008
+#define SUBOPT_COPY_DATA 0x00000010
+#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
+#define SUBOPT_REFRESH 0x00000040
+#define SUBOPT_BINARY 0x00000080
+#define SUBOPT_STREAMING 0x00000100
+
+/* check if the 'val' has 'bits' set */
+#define IsSet(val, bits) (((val) & (bits)) == (bits))
+
+/*
+ * Structure to hold a bitmap representing the user-provided CREATE/ALTER
+ * SUBSCRIPTION command options and the parsed/default values of each of them.
+ */
+typedef struct SubOpts
+{
+ bits32 specified_opts;
+ char *slot_name;
+ char *synchronous_commit;
+ bool connect;
+ bool enabled;
+ bool create_slot;
+ bool copy_data;
+ bool refresh;
+ bool binary;
+ bool streaming;
+} SubOpts;
+
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
*
* Since not all options can be specified in both commands, this function
- * will report an error on options if the target output pointer is NULL to
- * accommodate that.
+ * will report an error if mutually exclusive options are specified.
+ *
+ * Caller is expected to have cleared 'opts'.
*/
static void
-parse_subscription_options(List *options,
- bool *connect,
- bool *enabled_given, bool *enabled,
- bool *create_slot,
- bool *slot_name_given, char **slot_name,
- bool *copy_data,
- char **synchronous_commit,
- bool *refresh,
- bool *binary_given, bool *binary,
- bool *streaming_given, bool *streaming)
+parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
{
ListCell *lc;
- bool connect_given = false;
- bool create_slot_given = false;
- bool copy_data_given = false;
- bool refresh_given = false;
-
- /* If connect is specified, the others also need to be. */
- Assert(!connect || (enabled && create_slot && copy_data));
- if (connect)
- *connect = true;
- if (enabled)
- {
- *enabled_given = false;
- *enabled = true;
- }
- if (create_slot)
- *create_slot = true;
- if (slot_name)
- {
- *slot_name_given = false;
- *slot_name = NULL;
- }
- if (copy_data)
- *copy_data = true;
- if (synchronous_commit)
- *synchronous_commit = NULL;
- if (refresh)
- *refresh = true;
- if (binary)
- {
- *binary_given = false;
- *binary = false;
- }
- if (streaming)
- {
- *streaming_given = false;
- *streaming = false;
- }
+ /* caller must expect some option */
+ Assert(supported_opts != 0);
+
+ /* If connect option is supported, these others also need to be. */
+ Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
+ IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+ SUBOPT_COPY_DATA));
+
+ /* Set default values for the boolean supported options. */
+ if (IsSet(supported_opts, SUBOPT_CONNECT))
+ opts->connect = true;
+ if (IsSet(supported_opts, SUBOPT_ENABLED))
+ opts->enabled = true;
+ if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
+ opts->create_slot = true;
+ if (IsSet(supported_opts, SUBOPT_COPY_DATA))
+ opts->copy_data = true;
+ if (IsSet(supported_opts, SUBOPT_REFRESH))
+ opts->refresh = true;
+ if (IsSet(supported_opts, SUBOPT_BINARY))
+ opts->binary = false;
+ if (IsSet(supported_opts, SUBOPT_STREAMING))
+ opts->streaming = false;
/* Parse options */
- foreach(lc, options)
+ foreach(lc, stmt_options)
{
DefElem *defel = (DefElem *) lfirst(lc);
- if (strcmp(defel->defname, "connect") == 0 && connect)
+ if (IsSet(supported_opts, SUBOPT_CONNECT) &&
+ strcmp(defel->defname, "connect") == 0)
{
- if (connect_given)
+ if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- connect_given = true;
- *connect = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_CONNECT;
+ opts->connect = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "enabled") == 0 && enabled)
+ else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
+ strcmp(defel->defname, "enabled") == 0)
{
- if (*enabled_given)
+ if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *enabled_given = true;
- *enabled = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_ENABLED;
+ opts->enabled = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
+ else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+ strcmp(defel->defname, "create_slot") == 0)
{
- if (create_slot_given)
+ if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- create_slot_given = true;
- *create_slot = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_CREATE_SLOT;
+ opts->create_slot = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
+ else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+ strcmp(defel->defname, "slot_name") == 0)
{
- if (*slot_name_given)
+ if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *slot_name_given = true;
- *slot_name = defGetString(defel);
+ opts->specified_opts |= SUBOPT_SLOT_NAME;
+ opts->slot_name = defGetString(defel);
/* Setting slot_name = NONE is treated as no slot name. */
- if (strcmp(*slot_name, "none") == 0)
- *slot_name = NULL;
+ if (strcmp(opts->slot_name, "none") == 0)
+ opts->slot_name = NULL;
}
- else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
+ else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+ strcmp(defel->defname, "copy_data") == 0)
{
- if (copy_data_given)
+ if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- copy_data_given = true;
- *copy_data = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_COPY_DATA;
+ opts->copy_data = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
- synchronous_commit)
+ else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
+ strcmp(defel->defname, "synchronous_commit") == 0)
{
- if (*synchronous_commit)
+ if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *synchronous_commit = defGetString(defel);
+ opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
+ opts->synchronous_commit = defGetString(defel);
/* Test if the given value is valid for synchronous_commit GUC. */
- (void) set_config_option("synchronous_commit", *synchronous_commit,
+ (void) set_config_option("synchronous_commit", opts->synchronous_commit,
PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET,
false, 0, false);
}
- else if (strcmp(defel->defname, "refresh") == 0 && refresh)
+ else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
+ strcmp(defel->defname, "refresh") == 0)
{
- if (refresh_given)
+ if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- refresh_given = true;
- *refresh = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_REFRESH;
+ opts->refresh = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "binary") == 0 && binary)
+ else if (IsSet(supported_opts, SUBOPT_BINARY) &&
+ strcmp(defel->defname, "binary") == 0)
{
- if (*binary_given)
+ if (IsSet(opts->specified_opts, SUBOPT_BINARY))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *binary_given = true;
- *binary = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_BINARY;
+ opts->binary = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+ else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
+ strcmp(defel->defname, "streaming") == 0)
{
- if (*streaming_given)
+ if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
- *streaming_given = true;
- *streaming = defGetBoolean(defel);
+ opts->specified_opts |= SUBOPT_STREAMING;
+ opts->streaming = defGetBoolean(defel);
}
else
ereport(ERROR,
* We've been explicitly asked to not connect, that requires some
* additional processing.
*/
- if (connect && !*connect)
+ if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
{
/* Check for incompatible options from the user. */
- if (enabled && *enabled_given && *enabled)
+ if (opts->enabled &&
+ IsSet(supported_opts, SUBOPT_ENABLED) &&
+ IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"connect = false", "enabled = true")));
- if (create_slot && create_slot_given && *create_slot)
+ if (opts->create_slot &&
+ IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+ IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"connect = false", "create_slot = true")));
- if (copy_data && copy_data_given && *copy_data)
+ if (opts->copy_data &&
+ IsSet(supported_opts, SUBOPT_COPY_DATA) &&
+ IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));
/* Change the defaults of other options. */
- *enabled = false;
- *create_slot = false;
- *copy_data = false;
+ opts->enabled = false;
+ opts->create_slot = false;
+ opts->copy_data = false;
}
/*
* Do additional checking for disallowed combination when slot_name = NONE
* was used.
*/
- if (slot_name && *slot_name_given && !*slot_name)
+ if (!opts->slot_name &&
+ IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
+ IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
{
- if (enabled && *enabled_given && *enabled)
+ if (opts->enabled &&
+ IsSet(supported_opts, SUBOPT_ENABLED) &&
+ IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"slot_name = NONE", "enabled = true")));
- if (create_slot && create_slot_given && *create_slot)
+ if (opts->create_slot &&
+ IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+ IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
+ /*- translator: both %s are strings of the form "option = value" */
errmsg("%s and %s are mutually exclusive options",
"slot_name = NONE", "create_slot = true")));
- if (enabled && !*enabled_given && *enabled)
+ if (opts->enabled &&
+ IsSet(supported_opts, SUBOPT_ENABLED) &&
+ !IsSet(opts->specified_opts, SUBOPT_ENABLED))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
/*- translator: both %s are strings of the form "option = value" */
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "enabled = false")));
- if (create_slot && !create_slot_given && *create_slot)
+ if (opts->create_slot &&
+ IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
+ !IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
+ /*- translator: both %s are strings of the form "option = value" */
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "create_slot = false")));
}
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
- bool connect;
- bool enabled_given;
- bool enabled;
- bool copy_data;
- bool streaming;
- bool streaming_given;
- char *synchronous_commit;
char *conninfo;
- char *slotname;
- bool slotname_given;
- bool binary;
- bool binary_given;
char originname[NAMEDATALEN];
- bool create_slot;
List *publications;
+ bits32 supported_opts;
+ SubOpts opts = {0};
/*
* Parse and check options.
*
* Connection and publication should not be specified here.
*/
- parse_subscription_options(stmt->options,
- &connect,
- &enabled_given, &enabled,
- &create_slot,
- &slotname_given, &slotname,
- ©_data,
- &synchronous_commit,
- NULL, /* no "refresh" */
- &binary_given, &binary,
- &streaming_given, &streaming);
+ supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
+ SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
+ SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+ SUBOPT_STREAMING);
+ parse_subscription_options(stmt->options, supported_opts, &opts);
/*
* Since creating a replication slot is not transactional, rolling back
* CREATE SUBSCRIPTION inside a transaction block if creating a
* replication slot.
*/
- if (create_slot)
+ if (opts.create_slot)
PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
if (!superuser())
stmt->subname)));
}
- if (!slotname_given && slotname == NULL)
- slotname = stmt->subname;
+ if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
+ opts.slot_name == NULL)
+ opts.slot_name = stmt->subname;
/* The default for synchronous_commit of subscriptions is off. */
- if (synchronous_commit == NULL)
- synchronous_commit = "off";
+ if (opts.synchronous_commit == NULL)
+ opts.synchronous_commit = "off";
conninfo = stmt->conninfo;
publications = stmt->publication;
values[Anum_pg_subscription_subname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
- values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
- values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
- values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+ values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
+ values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
- if (slotname)
+ if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
else
nulls[Anum_pg_subscription_subslotname - 1] = true;
values[Anum_pg_subscription_subsynccommit - 1] =
- CStringGetTextDatum(synchronous_commit);
+ CStringGetTextDatum(opts.synchronous_commit);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publications);
* Connect to remote side to execute requested commands and fetch table
* info.
*/
- if (connect)
+ if (opts.connect)
{
char *err;
WalReceiverConn *wrconn;
* Set sync state based on if we were asked to do data copy or
* not.
*/
- table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+ table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/*
* Get the table list from publisher and build local table status
* won't use the initial snapshot for anything, so no need to
* export it.
*/
- if (create_slot)
+ if (opts.create_slot)
{
- Assert(slotname);
+ Assert(opts.slot_name);
- walrcv_create_slot(wrconn, slotname, false,
+ walrcv_create_slot(wrconn, opts.slot_name, false,
CRS_NOEXPORT_SNAPSHOT, NULL);
ereport(NOTICE,
(errmsg("created replication slot \"%s\" on publisher",
- slotname)));
+ opts.slot_name)));
}
}
PG_FINALLY();
table_close(rel, RowExclusiveLock);
- if (enabled)
+ if (opts.enabled)
ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);
bool update_tuple = false;
Subscription *sub;
Form_pg_subscription form;
+ bits32 supported_opts;
+ SubOpts opts = {0};
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
{
case ALTER_SUBSCRIPTION_OPTIONS:
{
- char *slotname;
- bool slotname_given;
- char *synchronous_commit;
- bool binary_given;
- bool binary;
- bool streaming_given;
- bool streaming;
-
- parse_subscription_options(stmt->options,
- NULL, /* no "connect" */
- NULL, NULL, /* no "enabled" */
- NULL, /* no "create_slot" */
- &slotname_given, &slotname,
- NULL, /* no "copy_data" */
- &synchronous_commit,
- NULL, /* no "refresh" */
- &binary_given, &binary,
- &streaming_given, &streaming);
-
- if (slotname_given)
+ supported_opts = (SUBOPT_SLOT_NAME |
+ SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+ SUBOPT_STREAMING);
+
+ parse_subscription_options(stmt->options, supported_opts, &opts);
+
+ if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
{
- if (sub->enabled && !slotname)
+ if (sub->enabled && !opts.slot_name)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"slot_name = NONE")));
- if (slotname)
+ if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
- DirectFunctionCall1(namein, CStringGetDatum(slotname));
+ DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
else
nulls[Anum_pg_subscription_subslotname - 1] = true;
replaces[Anum_pg_subscription_subslotname - 1] = true;
}
- if (synchronous_commit)
+ if (opts.synchronous_commit)
{
values[Anum_pg_subscription_subsynccommit - 1] =
- CStringGetTextDatum(synchronous_commit);
+ CStringGetTextDatum(opts.synchronous_commit);
replaces[Anum_pg_subscription_subsynccommit - 1] = true;
}
- if (binary_given)
+ if (IsSet(opts.specified_opts, SUBOPT_BINARY))
{
values[Anum_pg_subscription_subbinary - 1] =
- BoolGetDatum(binary);
+ BoolGetDatum(opts.binary);
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
- if (streaming_given)
+ if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
- BoolGetDatum(streaming);
+ BoolGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
case ALTER_SUBSCRIPTION_ENABLED:
{
- bool enabled,
- enabled_given;
-
- parse_subscription_options(stmt->options,
- NULL, /* no "connect" */
- &enabled_given, &enabled,
- NULL, /* no "create_slot" */
- NULL, NULL, /* no "slot_name" */
- NULL, /* no "copy_data" */
- NULL, /* no "synchronous_commit" */
- NULL, /* no "refresh" */
- NULL, NULL, /* no "binary" */
- NULL, NULL); /* no streaming */
- Assert(enabled_given);
-
- if (!sub->slotname && enabled)
+ parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
+ Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
+
+ if (!sub->slotname && opts.enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
values[Anum_pg_subscription_subenabled - 1] =
- BoolGetDatum(enabled);
+ BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
- if (enabled)
+ if (opts.enabled)
ApplyLauncherWakeupAtCommit();
update_tuple = true;
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
- bool copy_data;
- bool refresh;
-
- parse_subscription_options(stmt->options,
- NULL, /* no "connect" */
- NULL, NULL, /* no "enabled" */
- NULL, /* no "create_slot" */
- NULL, NULL, /* no "slot_name" */
- ©_data,
- NULL, /* no "synchronous_commit" */
- &refresh,
- NULL, NULL, /* no "binary" */
- NULL, NULL); /* no "streaming" */
+ supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+ parse_subscription_options(stmt->options, supported_opts, &opts);
+
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
update_tuple = true;
/* Refresh if user asked us to. */
- if (refresh)
+ if (opts.refresh)
{
if (!sub->enabled)
ereport(ERROR,
/* Make sure refresh sees the new list of publications. */
sub->publications = stmt->publication;
- AlterSubscription_refresh(sub, copy_data);
+ AlterSubscription_refresh(sub, opts.copy_data);
}
break;
case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
{
- bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
- bool copy_data = false;
- bool refresh;
List *publist;
+ bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
- parse_subscription_options(stmt->options,
- NULL, /* no "connect" */
- NULL, NULL, /* no "enabled" */
- NULL, /* no "create_slot" */
- NULL, NULL, /* no "slot_name" */
- isadd ? ©_data : NULL, /* for drop, no
- * "copy_data" */
- NULL, /* no "synchronous_commit" */
- &refresh,
- NULL, NULL, /* no "binary" */
- NULL, NULL); /* no "streaming" */
+ supported_opts = SUBOPT_REFRESH;
+ if (isadd)
+ supported_opts |= SUBOPT_COPY_DATA;
- publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+ parse_subscription_options(stmt->options, supported_opts, &opts);
+ publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publist);
replaces[Anum_pg_subscription_subpublications - 1] = true;
update_tuple = true;
/* Refresh if user asked us to. */
- if (refresh)
+ if (opts.refresh)
{
if (!sub->enabled)
ereport(ERROR,
/* Only refresh the added/dropped list of publications. */
sub->publications = stmt->publication;
- AlterSubscription_refresh(sub, copy_data);
+ AlterSubscription_refresh(sub, opts.copy_data);
}
break;
case ALTER_SUBSCRIPTION_REFRESH:
{
- bool copy_data;
-
if (!sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
- parse_subscription_options(stmt->options,
- NULL, /* no "connect" */
- NULL, NULL, /* no "enabled" */
- NULL, /* no "create_slot" */
- NULL, NULL, /* no "slot_name" */
- ©_data,
- NULL, /* no "synchronous_commit" */
- NULL, /* no "refresh" */
- NULL, NULL, /* no "binary" */
- NULL, NULL); /* no "streaming" */
+ parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
- AlterSubscription_refresh(sub, copy_data);
+ AlterSubscription_refresh(sub, opts.copy_data);
break;
}