Raise a WARNING for missing publications.
authorAmit Kapila <akapila@postgresql.org>
Thu, 31 Mar 2022 02:54:19 +0000 (08:24 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 31 Mar 2022 02:55:50 +0000 (08:25 +0530)
When we create or alter a subscription to add publications raise a warning
for non-existent publications. We don't want to give an error here because
it is possible that users can later create the missing publications.

Author: Vignesh C
Reviewed-by: Bharath Rupireddy, Japin Li, Dilip Kumar, Euler Taveira, Ashutosh Sharma, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm0f4YujGW+q-Di0CbZpnQKFFrXntikaQQKuEmGG0=Zw=Q@mail.gmail.com

doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/commands/subscriptioncmds.c
src/test/subscription/t/007_ddl.pl

index 3e46bbdb0460466607924373f9922cfc9d99def5..fe13ab9a2defbcdec59c283a73242973d450a304 100644 (file)
@@ -114,7 +114,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       replaces the entire list of publications with a new list,
       <literal>ADD</literal> adds additional publications to the list of
       publications, and <literal>DROP</literal> removes the publications from
-      the list of publications.  See <xref linkend="sql-createsubscription"/>
+      the list of publications.  We allow non-existent publications to be
+      specified in <literal>ADD</literal> and <literal>SET</literal> variants
+      so that users can add those later.  See <xref linkend="sql-createsubscription"/>
       for more information.  By default, this command will also act like
       <literal>REFRESH PUBLICATION</literal>.
      </para>
index b701752fc9b2131293546ad7c8f6ff30aaecba52..ebf7db57c58d309806a8c110e7c1f52a9995ea7b 100644 (file)
@@ -356,6 +356,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    copied data that would be incompatible with subsequent filtering.
   </para>
 
+  <para>
+   We allow non-existent publications to be specified so that users can add
+   those later. This means
+   <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
+   can have non-existent publications.
+  </para>
+
  </refsect1>
 
  <refsect1>
index abebffdf3bb582c5d0ede44d9889f7db36016a07..85dacbe93d69ae1e94bc61118f6d08436e6d138f 100644 (file)
@@ -375,6 +375,103 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
        }
 }
 
+/*
+ * Add publication names from the list to a string.
+ */
+static void
+get_publications_str(List *publications, StringInfo dest, bool quote_literal)
+{
+       ListCell   *lc;
+       bool            first = true;
+
+       Assert(list_length(publications) > 0);
+
+       foreach(lc, publications)
+       {
+               char       *pubname = strVal(lfirst(lc));
+
+               if (first)
+                       first = false;
+               else
+                       appendStringInfoString(dest, ", ");
+
+               if (quote_literal)
+                       appendStringInfoString(dest, quote_literal_cstr(pubname));
+               else
+               {
+                       appendStringInfoChar(dest, '"');
+                       appendStringInfoString(dest, pubname);
+                       appendStringInfoChar(dest, '"');
+               }
+       }
+}
+
+/*
+ * Check the specified publication(s) is(are) present in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+       WalRcvExecResult *res;
+       StringInfo      cmd;
+       TupleTableSlot *slot;
+       List       *publicationsCopy = NIL;
+       Oid                     tableRow[1] = {TEXTOID};
+
+       cmd = makeStringInfo();
+       appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
+                                                  " pg_catalog.pg_publication t WHERE\n"
+                                                  " t.pubname IN (");
+       get_publications_str(publications, cmd, true);
+       appendStringInfoChar(cmd, ')');
+
+       res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+       pfree(cmd->data);
+       pfree(cmd);
+
+       if (res->status != WALRCV_OK_TUPLES)
+               ereport(ERROR,
+                               errmsg_plural("could not receive publication from the publisher: %s",
+                                                         "could not receive list of publications from the publisher: %s",
+                                                         list_length(publications),
+                                                         res->err));
+
+       publicationsCopy = list_copy(publications);
+
+       /* Process publication(s). */
+       slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+       while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+       {
+               char       *pubname;
+               bool            isnull;
+
+               pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               /* Delete the publication present in publisher from the list. */
+               publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+               ExecClearTuple(slot);
+       }
+
+       ExecDropSingleTupleTableSlot(slot);
+
+       walrcv_clear_result(res);
+
+       if (list_length(publicationsCopy))
+       {
+               /* Prepare the list of non-existent publication(s) for error message. */
+               StringInfo      pubnames = makeStringInfo();
+
+               get_publications_str(publicationsCopy, pubnames, false);
+               ereport(WARNING,
+                               errcode(ERRCODE_UNDEFINED_OBJECT),
+                               errmsg_plural("publication %s does not exist in the publisher",
+                                                         "publications %s do not exist in the publisher",
+                                                         list_length(publicationsCopy),
+                                                         pubnames->data));
+       }
+}
+
 /*
  * Auxiliary function to build a text array out of a list of String nodes.
  */
@@ -555,6 +652,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
                PG_TRY();
                {
+                       check_publications(wrconn, publications);
+
                        /*
                         * Set sync state based on if we were asked to do data copy or
                         * not.
@@ -650,7 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data)
+AlterSubscription_refresh(Subscription *sub, bool copy_data,
+                                                 List *validate_publications)
 {
        char       *err;
        List       *pubrel_names;
@@ -681,6 +781,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
        PG_TRY();
        {
+               if (validate_publications)
+                       check_publications(wrconn, validate_publications);
+
                /* Get the list of relations from publisher. */
                pubrel_names = fetch_table_list(wrconn, sub->publications);
                pubrel_names = list_concat(pubrel_names,
@@ -1048,7 +1151,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                                        /* Make sure refresh sees the new list of publications. */
                                        sub->publications = stmt->publication;
 
-                                       AlterSubscription_refresh(sub, opts.copy_data);
+                                       AlterSubscription_refresh(sub, opts.copy_data,
+                                                                                         stmt->publication);
                                }
 
                                break;
@@ -1074,6 +1178,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                                /* Refresh if user asked us to. */
                                if (opts.refresh)
                                {
+                                       /* We only need to validate user specified publications. */
+                                       List       *validate_publications = (isadd) ? stmt->publication : NULL;
+
                                        if (!sub->enabled)
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1096,7 +1203,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                                        /* Refresh the new list of publications. */
                                        sub->publications = publist;
 
-                                       AlterSubscription_refresh(sub, opts.copy_data);
+                                       AlterSubscription_refresh(sub, opts.copy_data,
+                                                                                         validate_publications);
                                }
 
                                break;
@@ -1138,7 +1246,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
                                PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-                               AlterSubscription_refresh(sub, opts.copy_data);
+                               AlterSubscription_refresh(sub, opts.copy_data, NULL);
 
                                break;
                        }
@@ -1659,28 +1767,13 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
        StringInfoData cmd;
        TupleTableSlot *slot;
        Oid                     tableRow[2] = {TEXTOID, TEXTOID};
-       ListCell   *lc;
-       bool            first;
        List       *tablelist = NIL;
 
-       Assert(list_length(publications) > 0);
-
        initStringInfo(&cmd);
        appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
                                                   "  FROM pg_catalog.pg_publication_tables t\n"
                                                   " WHERE t.pubname IN (");
-       first = true;
-       foreach(lc, publications)
-       {
-               char       *pubname = strVal(lfirst(lc));
-
-               if (first)
-                       first = false;
-               else
-                       appendStringInfoString(&cmd, ", ");
-
-               appendStringInfoString(&cmd, quote_literal_cstr(pubname));
-       }
+       get_publications_str(publications, &cmd, true);
        appendStringInfoChar(&cmd, ')');
 
        res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
index 1144b005f6df75d17468e260a37a4e3d8528f4a4..39c32eda44d82bdf879d0c2353f207286374423b 100644 (file)
@@ -41,6 +41,43 @@ COMMIT;
 
 pass "subscription disable and drop in same transaction did not hang";
 
+# One of the specified publications exists.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub"
+);
+ok( $stderr =~
+         m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+       "Create subscription throws warning for non-existent publication");
+
+$node_publisher->wait_for_catchup('mysub1');
+
+# Also wait for initial table sync to finish.
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for initial table sync to finish.
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Specifying non-existent publication along with add publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+       'postgres',
+       "ALTER SUBSCRIPTION mysub1 ADD PUBLICATION non_existent_pub1, non_existent_pub2"
+);
+ok( $stderr =~
+         m/WARNING:  publications "non_existent_pub1", "non_existent_pub2" do not exist in the publisher/,
+       "Alter subscription add publication throws warning for non-existent publications");
+
+# Specifying non-existent publication along with set publication.
+($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+       "ALTER SUBSCRIPTION mysub1 SET PUBLICATION non_existent_pub"
+);
+ok( $stderr =~
+         m/WARNING:  publication "non_existent_pub" does not exist in the publisher/,
+       "Alter subscription set publication throws warning for non-existent publication");
+
 $node_subscriber->stop;
 $node_publisher->stop;