<synopsis>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
<para>
Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
- <command>ALTER SUBSCRIPTION ... SET PUBLICATION ...</command> with refresh
+ <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command> with refresh
option as true cannot be executed inside a transaction block.
</para>
</refsect1>
<varlistentry>
<term><literal>SET PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
+ <term><literal>ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
+ <term><literal>DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
<listitem>
<para>
- Changes list of subscribed publications. See
- <xref linkend="sql-createsubscription"/> for more information.
- By default this command will also act like <literal>REFRESH
- PUBLICATION</literal>.
+ Changes the list of subscribed publications. <literal>SET</literal>
+ replaces the entire list of publications with a new list,
+ <literal>ADD</literal> adds additional publications,
+ <literal>DROP</literal> removes publications from the list of
+ publications. See <xref linkend="sql-createsubscription"/> for more
+ information. By default, this command will also act like
+ <literal>REFRESH PUBLICATION</literal>, except that in case of
+ <literal>ADD</literal> or <literal>DROP</literal>, only the added or
+ dropped publications are refreshed.
</para>
<para>
#include "utils/syscache.h"
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_duplicates_in_publist(List *publist, Datum *datums);
+static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
{
ArrayType *arr;
Datum *datums;
- int j = 0;
- ListCell *cell;
MemoryContext memcxt;
MemoryContext oldcxt;
datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
- foreach(cell, publist)
- {
- char *name = strVal(lfirst(cell));
- ListCell *pcell;
-
- /* Check for duplicates. */
- foreach(pcell, publist)
- {
- char *pname = strVal(lfirst(pcell));
-
- if (pcell == cell)
- break;
-
- if (strcmp(name, pname) == 0)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("publication name \"%s\" used more than once",
- pname)));
- }
-
- datums[j++] = CStringGetTextDatum(name);
- }
+ check_duplicates_in_publist(publist, datums);
MemoryContextSwitchTo(oldcxt);
update_tuple = true;
break;
- case ALTER_SUBSCRIPTION_PUBLICATION:
+ case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
bool copy_data;
bool refresh;
break;
}
+ case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+ case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+ {
+ bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+ bool copy_data;
+ bool refresh;
+ List *publist;
+
+ publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+
+ parse_subscription_options(stmt->options,
+ NULL, /* no "connect" */
+ NULL, NULL, /* no "enabled" */
+ NULL, /* no "create_slot" */
+ NULL, NULL, /* no "slot_name" */
+ isadd ? ©_data : NULL, /* for drop, no
+ * "copy_data" */
+ NULL, /* no "synchronous_commit" */
+ &refresh,
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "streaming" */
+
+ values[Anum_pg_subscription_subpublications - 1] =
+ publicationListToArray(publist);
+ replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+ update_tuple = true;
+
+ /* Refresh if user asked us to. */
+ if (refresh)
+ {
+ if (!sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+ errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+ PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
+ /* Only refresh the added/dropped list of publications. */
+ sub->publications = stmt->publication;
+
+ AlterSubscription_refresh(sub, copy_data);
+ }
+
+ break;
+ }
+
case ALTER_SUBSCRIPTION_REFRESH:
{
bool copy_data;
errhint("Use %s to disassociate the subscription from the slot.",
"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
}
+
+/*
+ * Check for duplicates in the given list of publications and error out if
+ * found one. Add publications to datums as text datums, if datums is not
+ * NULL.
+ */
+static void
+check_duplicates_in_publist(List *publist, Datum *datums)
+{
+ ListCell *cell;
+ int j = 0;
+
+ foreach(cell, publist)
+ {
+ char *name = strVal(lfirst(cell));
+ ListCell *pcell;
+
+ foreach(pcell, publist)
+ {
+ char *pname = strVal(lfirst(pcell));
+
+ if (pcell == cell)
+ break;
+
+ if (strcmp(name, pname) == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication name \"%s\" used more than once",
+ pname)));
+ }
+
+ if (datums)
+ datums[j++] = CStringGetTextDatum(name);
+ }
+}
+
+/*
+ * Merge current subscription's publications and user-specified publications
+ * from ADD/DROP PUBLICATIONS.
+ *
+ * If addpub is true, we will add the list of publications into oldpublist.
+ * Otherwise, we will delete the list of publications from oldpublist. The
+ * returned list is a copy, oldpublist itself is not changed.
+ *
+ * subname is the subscription name, for error messages.
+ */
+static List *
+merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
+{
+ ListCell *lc;
+
+ oldpublist = list_copy(oldpublist);
+
+ check_duplicates_in_publist(newpublist, NULL);
+
+ foreach(lc, newpublist)
+ {
+ char *name = strVal(lfirst(lc));
+ ListCell *lc2;
+ bool found = false;
+
+ foreach(lc2, oldpublist)
+ {
+ char *pubname = strVal(lfirst(lc2));
+
+ if (strcmp(name, pubname) == 0)
+ {
+ found = true;
+ if (addpub)
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("publication \"%s\" is already in subscription \"%s\"",
+ name, subname)));
+ else
+ oldpublist = foreach_delete_current(oldpublist, lc2);
+
+ break;
+ }
+ }
+
+ if (addpub && !found)
+ oldpublist = lappend(oldpublist, makeString(name));
+ else if (!addpub && !found)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("publication \"%s\" is not in subscription \"%s\"",
+ name, subname)));
+ }
+
+ /*
+ * XXX Probably no strong reason for this, but for now it's to make ALTER
+ * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
+ */
+ if (!oldpublist)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("subscription must contain at least one publication")));
+
+ return oldpublist;
+}
n->options = $6;
$$ = (Node *)n;
}
+ | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+ n->subname = $3;
+ n->publication = $6;
+ n->options = $7;
+ $$ = (Node *)n;
+ }
+ | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+ {
+ AlterSubscriptionStmt *n =
+ makeNode(AlterSubscriptionStmt);
+ n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+ n->subname = $3;
+ n->publication = $6;
+ n->options = $7;
+ $$ = (Node *)n;
+ }
| ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition
{
AlterSubscriptionStmt *n =
makeNode(AlterSubscriptionStmt);
- n->kind = ALTER_SUBSCRIPTION_PUBLICATION;
+ n->kind = ALTER_SUBSCRIPTION_SET_PUBLICATION;
n->subname = $3;
n->publication = $6;
n->options = $7;
/* ALTER SUBSCRIPTION <name> */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
- "RENAME TO", "REFRESH PUBLICATION", "SET");
+ "RENAME TO", "REFRESH PUBLICATION", "SET",
+ "ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
TailMatches("REFRESH", "PUBLICATION"))
{
/* complete with nothing here as this refers to remote publications */
}
- /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> */
+ /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
- TailMatches("SET", "PUBLICATION", MatchAny))
+ TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny))
COMPLETE_WITH("WITH (");
- /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> WITH ( */
+ /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
- TailMatches("SET", "PUBLICATION", MatchAny, "WITH", "("))
+ TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
COMPLETE_WITH("copy_data", "refresh");
+
/* ALTER SCHEMA <name> */
else if (Matches("ALTER", "SCHEMA", MatchAny))
COMPLETE_WITH("OWNER TO", "RENAME TO");
{
ALTER_SUBSCRIPTION_OPTIONS,
ALTER_SUBSCRIPTION_CONNECTION,
- ALTER_SUBSCRIPTION_PUBLICATION,
+ ALTER_SUBSCRIPTION_SET_PUBLICATION,
+ ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+ ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
ALTER_SUBSCRIPTION_ENABLED
} AlterSubscriptionType;
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+ERROR: publication "testpub" is already in subscription "regress_testsub"
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR: publication name "testpub1" used more than once
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+ERROR: publication "testpub1" is already in subscription "regress_testsub"
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR: publication name "testpub1" used more than once
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+ERROR: subscription must contain at least one publication
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+ERROR: publication "testpub3" is not in subscription "regress_testsub"
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+ERROR: unrecognized subscription parameter: "copy_data"
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub
WITH (connect = false, create_slot = false, copy_data = false);
\dRs+
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub