ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
authorPeter Eisentraut <peter@eisentraut.org>
Tue, 6 Apr 2021 08:44:26 +0000 (10:44 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Tue, 6 Apr 2021 09:49:51 +0000 (11:49 +0200)
At present, if we want to update publications in a subscription, we
can use SET PUBLICATION.  However, it requires supplying all
publications that exists and the new publications.  If we want to add
new publications, it's inconvenient.  The new syntax only supplies the
new publications.  When the refresh is true, it only refreshes the new
publications.

Author: Japin Li <japinli@hotmail.com>
Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/MEYP282MB166939D0D6C480B7FBE7EFFBB6BC0@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM

doc/src/sgml/ref/alter_subscription.sgml
src/backend/commands/subscriptioncmds.c
src/backend/parser/gram.y
src/bin/psql/tab-complete.c
src/include/nodes/parsenodes.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql

index 5aed2694350bb7095fa68fa725b83544ccc41f6f..367ac814f4b678bdaf368917652a7d303ccbe015 100644 (file)
@@ -23,6 +23,8 @@ PostgreSQL documentation
 <synopsis>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">set_publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> REFRESH PUBLICATION [ WITH ( <replaceable class="parameter">refresh_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ENABLE
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> DISABLE
@@ -63,7 +65,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
   <para>
    Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> and
-   <command>ALTER SUBSCRIPTION ... SET PUBLICATION ...</command> with refresh
+   <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command> with refresh
    option as true cannot be executed inside a transaction block.
   </para>
  </refsect1>
@@ -94,12 +96,19 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
    <varlistentry>
     <term><literal>SET PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
+    <term><literal>ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
+    <term><literal>DROP PUBLICATION <replaceable class="parameter">publication_name</replaceable></literal></term>
     <listitem>
      <para>
-      Changes list of subscribed publications. See
-      <xref linkend="sql-createsubscription"/> for more information.
-      By default this command will also act like <literal>REFRESH
-      PUBLICATION</literal>.
+      Changes the list of subscribed publications.  <literal>SET</literal>
+      replaces the entire list of publications with a new list,
+      <literal>ADD</literal> adds additional publications,
+      <literal>DROP</literal> removes publications from the list of
+      publications.  See <xref linkend="sql-createsubscription"/> for more
+      information.  By default, this command will also act like
+      <literal>REFRESH PUBLICATION</literal>, except that in case of
+      <literal>ADD</literal> or <literal>DROP</literal>, only the added or
+      dropped publications are refreshed.
      </para>
 
      <para>
index 5282b797359eb7a7d53579027b5a97d97671492c..517c8edd3b2602a9bee0f684fddd891f2b7c550c 100644 (file)
@@ -47,6 +47,8 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_duplicates_in_publist(List *publist, Datum *datums);
+static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
 
@@ -293,8 +295,6 @@ publicationListToArray(List *publist)
 {
    ArrayType  *arr;
    Datum      *datums;
-   int         j = 0;
-   ListCell   *cell;
    MemoryContext memcxt;
    MemoryContext oldcxt;
 
@@ -306,28 +306,7 @@ publicationListToArray(List *publist)
 
    datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
 
-   foreach(cell, publist)
-   {
-       char       *name = strVal(lfirst(cell));
-       ListCell   *pcell;
-
-       /* Check for duplicates. */
-       foreach(pcell, publist)
-       {
-           char       *pname = strVal(lfirst(pcell));
-
-           if (pcell == cell)
-               break;
-
-           if (strcmp(name, pname) == 0)
-               ereport(ERROR,
-                       (errcode(ERRCODE_SYNTAX_ERROR),
-                        errmsg("publication name \"%s\" used more than once",
-                               pname)));
-       }
-
-       datums[j++] = CStringGetTextDatum(name);
-   }
+   check_duplicates_in_publist(publist, datums);
 
    MemoryContextSwitchTo(oldcxt);
 
@@ -923,7 +902,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
            update_tuple = true;
            break;
 
-       case ALTER_SUBSCRIPTION_PUBLICATION:
+       case ALTER_SUBSCRIPTION_SET_PUBLICATION:
            {
                bool        copy_data;
                bool        refresh;
@@ -964,6 +943,54 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
                break;
            }
 
+       case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+       case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+           {
+               bool        isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+               bool        copy_data;
+               bool        refresh;
+               List       *publist;
+
+               publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
+
+               parse_subscription_options(stmt->options,
+                                          NULL,    /* no "connect" */
+                                          NULL, NULL,  /* no "enabled" */
+                                          NULL,    /* no "create_slot" */
+                                          NULL, NULL,  /* no "slot_name" */
+                                          isadd ? &copy_data : NULL,   /* for drop, no
+                                                                        * "copy_data" */
+                                          NULL,    /* no "synchronous_commit" */
+                                          &refresh,
+                                          NULL, NULL,  /* no "binary" */
+                                          NULL, NULL); /* no "streaming" */
+
+               values[Anum_pg_subscription_subpublications - 1] =
+                   publicationListToArray(publist);
+               replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+               update_tuple = true;
+
+               /* Refresh if user asked us to. */
+               if (refresh)
+               {
+                   if (!sub->enabled)
+                       ereport(ERROR,
+                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+                                errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+                   PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
+                   /* Only refresh the added/dropped list of publications. */
+                   sub->publications = stmt->publication;
+
+                   AlterSubscription_refresh(sub, copy_data);
+               }
+
+               break;
+           }
+
        case ALTER_SUBSCRIPTION_REFRESH:
            {
                bool        copy_data;
@@ -1548,3 +1575,103 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
             errhint("Use %s to disassociate the subscription from the slot.",
                     "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
 }
+
+/*
+ * Check for duplicates in the given list of publications and error out if
+ * found one.  Add publications to datums as text datums, if datums is not
+ * NULL.
+ */
+static void
+check_duplicates_in_publist(List *publist, Datum *datums)
+{
+   ListCell   *cell;
+   int         j = 0;
+
+   foreach(cell, publist)
+   {
+       char       *name = strVal(lfirst(cell));
+       ListCell   *pcell;
+
+       foreach(pcell, publist)
+       {
+           char       *pname = strVal(lfirst(pcell));
+
+           if (pcell == cell)
+               break;
+
+           if (strcmp(name, pname) == 0)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("publication name \"%s\" used more than once",
+                               pname)));
+       }
+
+       if (datums)
+           datums[j++] = CStringGetTextDatum(name);
+   }
+}
+
+/*
+ * Merge current subscription's publications and user-specified publications
+ * from ADD/DROP PUBLICATIONS.
+ *
+ * If addpub is true, we will add the list of publications into oldpublist.
+ * Otherwise, we will delete the list of publications from oldpublist.  The
+ * returned list is a copy, oldpublist itself is not changed.
+ *
+ * subname is the subscription name, for error messages.
+ */
+static List *
+merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
+{
+   ListCell   *lc;
+
+   oldpublist = list_copy(oldpublist);
+
+   check_duplicates_in_publist(newpublist, NULL);
+
+   foreach(lc, newpublist)
+   {
+       char       *name = strVal(lfirst(lc));
+       ListCell   *lc2;
+       bool        found = false;
+
+       foreach(lc2, oldpublist)
+       {
+           char       *pubname = strVal(lfirst(lc2));
+
+           if (strcmp(name, pubname) == 0)
+           {
+               found = true;
+               if (addpub)
+                   ereport(ERROR,
+                           (errcode(ERRCODE_DUPLICATE_OBJECT),
+                            errmsg("publication \"%s\" is already in subscription \"%s\"",
+                                   name, subname)));
+               else
+                   oldpublist = foreach_delete_current(oldpublist, lc2);
+
+               break;
+           }
+       }
+
+       if (addpub && !found)
+           oldpublist = lappend(oldpublist, makeString(name));
+       else if (!addpub && !found)
+           ereport(ERROR,
+                   (errcode(ERRCODE_SYNTAX_ERROR),
+                    errmsg("publication \"%s\" is not in subscription \"%s\"",
+                           name, subname)));
+   }
+
+   /*
+    * XXX Probably no strong reason for this, but for now it's to make ALTER
+    * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
+    */
+   if (!oldpublist)
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                errmsg("subscription must contain at least one publication")));
+
+   return oldpublist;
+}
index 05cc2c9ae0ddbaedc6cc66d523460ec2f33ea2c2..38c36a49360ef52ecfeb23fc9aec25361c3421d8 100644 (file)
@@ -9687,11 +9687,31 @@ AlterSubscriptionStmt:
                    n->options = $6;
                    $$ = (Node *)n;
                }
+           | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+               {
+                   AlterSubscriptionStmt *n =
+                       makeNode(AlterSubscriptionStmt);
+                   n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+                   n->subname = $3;
+                   n->publication = $6;
+                   n->options = $7;
+                   $$ = (Node *)n;
+               }
+           | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+               {
+                   AlterSubscriptionStmt *n =
+                       makeNode(AlterSubscriptionStmt);
+                   n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+                   n->subname = $3;
+                   n->publication = $6;
+                   n->options = $7;
+                   $$ = (Node *)n;
+               }
            | ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition
                {
                    AlterSubscriptionStmt *n =
                        makeNode(AlterSubscriptionStmt);
-                   n->kind = ALTER_SUBSCRIPTION_PUBLICATION;
+                   n->kind = ALTER_SUBSCRIPTION_SET_PUBLICATION;
                    n->subname = $3;
                    n->publication = $6;
                    n->options = $7;
index a053bc1e45d02e46ae22e56491388e65fd60b31c..832bcdfc3bf9f4f0f9a0aafa6b5a923fdfdcc252 100644 (file)
@@ -1652,7 +1652,8 @@ psql_completion(const char *text, int start, int end)
    /* ALTER SUBSCRIPTION <name> */
    else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
        COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
-                     "RENAME TO", "REFRESH PUBLICATION", "SET");
+                     "RENAME TO", "REFRESH PUBLICATION", "SET",
+                     "ADD PUBLICATION", "DROP PUBLICATION");
    /* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
             TailMatches("REFRESH", "PUBLICATION"))
@@ -1672,14 +1673,15 @@ psql_completion(const char *text, int start, int end)
    {
        /* complete with nothing here as this refers to remote publications */
    }
-   /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> */
+   /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
-            TailMatches("SET", "PUBLICATION", MatchAny))
+            TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny))
        COMPLETE_WITH("WITH (");
-   /* ALTER SUBSCRIPTION <name> SET PUBLICATION <name> WITH ( */
+   /* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
    else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
-            TailMatches("SET", "PUBLICATION", MatchAny, "WITH", "("))
+            TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
        COMPLETE_WITH("copy_data", "refresh");
+
    /* ALTER SCHEMA <name> */
    else if (Matches("ALTER", "SCHEMA", MatchAny))
        COMPLETE_WITH("OWNER TO", "RENAME TO");
index 97b80dfd21027fd96e0cdfd8986c7f5acdd08d9d..807fbaceaac557fcb46ef4de40234db7a5ac2b29 100644 (file)
@@ -3618,7 +3618,9 @@ typedef enum AlterSubscriptionType
 {
    ALTER_SUBSCRIPTION_OPTIONS,
    ALTER_SUBSCRIPTION_CONNECTION,
-   ALTER_SUBSCRIPTION_PUBLICATION,
+   ALTER_SUBSCRIPTION_SET_PUBLICATION,
+   ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+   ALTER_SUBSCRIPTION_DROP_PUBLICATION,
    ALTER_SUBSCRIPTION_REFRESH,
    ALTER_SUBSCRIPTION_ENABLED
 } AlterSubscriptionType;
index 14a430221d63cc0836392d8f40db2c328f761f72..09576c176b6fda4f3ec686191764f8453b9bcc79 100644 (file)
@@ -200,6 +200,45 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
 (1 row)
 
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+ERROR:  publication "testpub" is already in subscription "regress_testsub"
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR:  publication name "testpub1" used more than once
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+ERROR:  publication "testpub1" is already in subscription "regress_testsub"
+\dRs+
+                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | off                | dbname=regress_doesnotexist
+(1 row)
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+ERROR:  publication name "testpub1" used more than once
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+ERROR:  subscription must contain at least one publication
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+ERROR:  publication "testpub3" is not in subscription "regress_testsub"
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+ERROR:  unrecognized subscription parameter: "copy_data"
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+\dRs+
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+(1 row)
+
 DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub
        WITH (connect = false, create_slot = false, copy_data = false);
index 81e65e5e642a7483fe8035bd9bfa8f354757c985..308c098c144ac9d2a59d14a5757d5732d221280d 100644 (file)
@@ -145,6 +145,37 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 
 \dRs+
 
+-- fail - publication already exists
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub WITH (refresh = false);
+
+-- fail - publication used more than once
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- ok - add two publications into subscription
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publications already exist
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
+-- fail - publication used more then once
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub1 WITH (refresh = false);
+
+-- fail - all publications are deleted
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub, testpub1, testpub2 WITH (refresh = false);
+
+-- fail - publication does not exist in subscription
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub3 WITH (refresh = false);
+
+-- fail - do not support copy_data option
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1 WITH (refresh = false, copy_data = true);
+
+-- ok - delete publications
+ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
+
+\dRs+
+
 DROP SUBSCRIPTION regress_testsub;
 
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION mypub