Avoid invalidating all RelationSyncCache entries on publication rename.
authorAmit Kapila <akapila@postgresql.org>
Thu, 13 Mar 2025 03:33:45 +0000 (09:03 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 13 Mar 2025 03:46:33 +0000 (09:16 +0530)
On Publication rename, we need to only invalidate the RelationSyncCache
entries corresponding to relations that are part of the publication being
renamed.

As part of this patch, we introduce a new invalidation message to
invalidate the cache maintained by the logical decoding output plugin. We
can't use existing relcache invalidation for this purpose, as that would
unnecessarily cause relcache invalidations in other backends.

This will improve performance by building fewer relation cache entries
during logical replication.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OSCPR01MB14966C09AA201EFFA706576A7F5C92@OSCPR01MB14966.jpnprd01.prod.outlook.com

src/backend/access/rmgrdesc/standbydesc.c
src/backend/commands/alter.c
src/backend/commands/publicationcmds.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/utils/cache/inval.c
src/include/commands/publicationcmds.h
src/include/pg_config_manual.h
src/include/storage/sinval.h
src/include/utils/inval.h
src/test/subscription/t/007_ddl.pl

index d849f8e54ba851dd813e14d2288e643fe278f61b..81eff5f31c4f6a209c92a8d34ee0d093b324c95b 100644 (file)
@@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf,
            appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
        else if (msg->id == SHAREDINVALSNAPSHOT_ID)
            appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+       else if (msg->id == SHAREDINVALRELSYNC_ID)
+           appendStringInfo(buf, " relsync %u", msg->rs.relid);
        else
            appendStringInfo(buf, " unrecognized id %d", msg->id);
    }
index 78c1d4e1b848778ca70e0761c80df5a66762a43d..c801c869c1cfc5f5edad4244b12c8060b06ea8d7 100644 (file)
@@ -338,6 +338,22 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
 
    InvokeObjectPostAlterHook(classId, objectId, 0);
 
+   /* Do post catalog-update tasks */
+   if (classId == PublicationRelationId)
+   {
+       Form_pg_publication pub = (Form_pg_publication) GETSTRUCT(oldtup);
+
+       /*
+        * Invalidate relsynccache entries.
+        *
+        * Unlike ALTER PUBLICATION ADD/SET/DROP commands, renaming a
+        * publication does not impact the publication status of tables. So,
+        * we don't need to invalidate relcache to rebuild the rd_pubdesc.
+        * Instead, we invalidate only the relsyncache.
+        */
+       InvalidatePubRelSyncCache(pub->oid, pub->puballtables);
+   }
+
    /* Release memory */
    pfree(values);
    pfree(nulls);
index 150a768d16f4e13ff9c9553c3f27789a3aead1b6..3091d36ce98e535b9b01722307cf5d729c5147fb 100644 (file)
@@ -491,6 +491,45 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
    return *invalid_column_list || *invalid_gen_col;
 }
 
+/*
+ * Invalidate entries in the RelationSyncCache for relations included in the
+ * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
+ *
+ * If 'puballtables' is true, invalidate all cache entries.
+ */
+void
+InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
+{
+   if (puballtables)
+   {
+       CacheInvalidateRelSyncAll();
+   }
+   else
+   {
+       List       *relids = NIL;
+       List       *schemarelids = NIL;
+
+       /*
+        * For partitioned tables, we must invalidate all partitions and
+        * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
+        * a target. However, WAL records for TRUNCATE specify both a root and
+        * its leaves.
+        */
+       relids = GetPublicationRelations(pubid,
+                                        PUBLICATION_PART_ALL);
+       schemarelids = GetAllSchemaPublicationRelations(pubid,
+                                                       PUBLICATION_PART_ALL);
+
+       relids = list_concat_unique_oid(relids, schemarelids);
+
+       /* Invalidate the relsyncache */
+       foreach_oid(relid, relids)
+           CacheInvalidateRelSync(relid);
+   }
+
+   return;
+}
+
 /* check_functions_in_node callback */
 static bool
 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
index 9063af6e1df1adb46b47b817e35fa4fd43a5c7fd..ed806c543004f9c0e1bb3b629dfcb42b60a78ada 100644 (file)
@@ -531,6 +531,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
            CacheRegisterSyscacheCallback(PUBLICATIONOID,
                                          publication_invalidation_cb,
                                          (Datum) 0);
+           CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
+                                        (Datum) 0);
            publication_callback_registered = true;
        }
 
@@ -1789,12 +1791,6 @@ static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
    publications_valid = false;
-
-   /*
-    * Also invalidate per-relation cache so that next time the filtering info
-    * is checked it will be updated with the new publication settings.
-    */
-   rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
 /*
index 700ccb6df9b8726e299eaedcca320867899939d1..4eb67720737e8d2496bfc0fa9e6399a62e1c6b69 100644 (file)
@@ -271,6 +271,7 @@ int         debug_discard_caches = 0;
 
 #define MAX_SYSCACHE_CALLBACKS 64
 #define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
 
 static struct SYSCACHECALLBACK
 {
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
 
 static int relcache_callback_count = 0;
 
+static struct RELSYNCCALLBACK
+{
+   RelSyncCallbackFunction function;
+   Datum       arg;
+}          relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int relsync_callback_count = 0;
+
+
 /* ----------------------------------------------------------------
  *             Invalidation subgroup support functions
  * ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
    AddInvalidationMessage(group, RelCacheMsgs, &msg);
 }
 
+/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+                             Oid dbId, Oid relId)
+{
+   SharedInvalidationMessage msg;
+
+   /* Don't add a duplicate item. */
+   ProcessMessageSubGroup(group, RelCacheMsgs,
+                          if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+                              (msg->rc.relId == relId ||
+                               msg->rc.relId == InvalidOid))
+                          return);
+
+   /* OK, add the item */
+   msg.rc.id = SHAREDINVALRELSYNC_ID;
+   msg.rc.dbId = dbId;
+   msg.rc.relId = relId;
+   /* check AddCatcacheInvalidationMessage() for an explanation */
+   VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+   AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
 /*
  * Add a snapshot inval entry
  *
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
        info->RelcacheInitFileInval = true;
 }
 
+/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+   AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
 /*
  * RegisterSnapshotInvalidation
  *
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
 
        ccitem->function(ccitem->arg, InvalidOid);
    }
+
+   for (i = 0; i < relsync_callback_count; i++)
+   {
+       struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+       ccitem->function(ccitem->arg, InvalidOid);
+   }
 }
 
 /*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
        else if (msg->sn.dbId == MyDatabaseId)
            InvalidateCatalogSnapshot();
    }
+   else if (msg->id == SHAREDINVALRELSYNC_ID)
+   {
+       /* We only care about our own database */
+       if (msg->rs.dbId == MyDatabaseId)
+           CallRelSyncCallbacks(msg->rs.relid);
+   }
    else
        elog(FATAL, "unrecognized SI message ID: %d", msg->id);
 }
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
    ReleaseSysCache(tup);
 }
 
+/*
+ * CacheInvalidateRelSync
+ *     Register invalidation of the cache in logical decoding output plugin
+ *     for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+   RegisterRelsyncInvalidation(PrepareInvalidationState(),
+                               MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ *     Register invalidation of the whole cache in logical decoding output
+ *     plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+   CacheInvalidateRelSync(InvalidOid);
+}
 
 /*
  * CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
    ++relcache_callback_count;
 }
 
+/*
+ * CacheRegisterRelSyncCallback
+ *     Register the specified function to be called for all future
+ *     relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+                            Datum arg)
+{
+   if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+       elog(FATAL, "out of relsync_callback_list slots");
+
+   relsync_callback_list[relsync_callback_count].function = func;
+   relsync_callback_list[relsync_callback_count].arg = arg;
+
+   ++relsync_callback_count;
+}
+
 /*
  * CallSyscacheCallbacks
  *
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
    }
 }
 
+/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+   for (int i = 0; i < relsync_callback_count; i++)
+   {
+       struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+       ccitem->function(ccitem->arg, relid);
+   }
+}
+
 /*
  * LogLogicalInvalidations
  *
index e11a942ea0fd4d53b26e765574ebca69854de5e3..e41df6db03896a8779d4bc36bbb12fd3a1f4d6b1 100644 (file)
@@ -38,5 +38,6 @@ extern bool pub_contains_invalid_column(Oid pubid, Relation relation,
                                        char pubgencols_type,
                                        bool *invalid_column_list,
                                        bool *invalid_gen_col);
+extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables);
 
 #endif                         /* PUBLICATIONCMDS_H */
index 449e50bd78c2c46ed48ad75e090a58dcf095b5df..125d3eb5fff5ed9d40a5de06a3651dcf9be2d73d 100644 (file)
 
 /*
  * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable
- * use of the debug_discard_caches GUC to aggressively flush syscache/relcache
- * entries whenever it's possible to deliver invalidations.  See
- * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for
- * details.
+ * use of the debug_discard_caches GUC to aggressively flush
+ * syscache/relcache/relsynccache entries whenever it's possible to deliver
+ * invalidations.  See AcceptInvalidationMessages() in
+ * src/backend/utils/cache/inval.c for details.
  *
  * USE_ASSERT_CHECKING builds default to enabling this.  It's possible to use
  * DISCARD_CACHES_ENABLED without a cassert build and the implied
index 2463c0f9fac095c8da9f919b5cb4842d544c4be9..5dc5aafe5c9ff0a51dab5ecad760080112300d20 100644 (file)
@@ -27,6 +27,7 @@
  * * invalidate an smgr cache entry for a specific physical relation
  * * invalidate the mapped-relation mapping for a given database
  * * invalidate any saved snapshot that might be used to scan a given relation
+ * * invalidate a RelationSyncCache entry for a specific relation
  * More types could be added if needed.  The message type is identified by
  * the first "int8" field of the message struct.  Zero or positive means a
  * specific-catcache inval message (and also serves as the catcache ID field).
  * catcache inval messages must be generated for each of its caches, since
  * the hash keys will generally be different.
  *
- * Catcache, relcache, and snapshot invalidations are transactional, and so
- * are sent to other backends upon commit.  Internally to the generating
- * backend, they are also processed at CommandCounterIncrement so that later
- * commands in the same transaction see the new state.  The generating backend
- * also has to process them at abort, to flush out any cache state it's loaded
- * from no-longer-valid entries.
+ * Catcache, relcache, relsynccache, and snapshot invalidations are
+ * transactional, and so are sent to other backends upon commit.  Internally
+ * to the generating backend, they are also processed at
+ * CommandCounterIncrement so that later commands in the same transaction see
+ * the new state.  The generating backend also has to process them at abort,
+ * to flush out any cache state it's loaded from no-longer-valid entries.
  *
  * smgr and relation mapping invalidations are non-transactional: they are
  * sent immediately when the underlying file change is made.
@@ -110,6 +111,16 @@ typedef struct
    Oid         relId;          /* relation ID */
 } SharedInvalSnapshotMsg;
 
+#define SHAREDINVALRELSYNC_ID  (-6)
+
+typedef struct
+{
+   int8        id;             /* type field --- must be first */
+   Oid         dbId;           /* database ID */
+   Oid         relid;          /* relation ID, or 0 if whole
+                                * RelationSyncCache */
+}          SharedInvalRelSyncMsg;
+
 typedef union
 {
    int8        id;             /* type field --- must be first */
@@ -119,6 +130,7 @@ typedef union
    SharedInvalSmgrMsg sm;
    SharedInvalRelmapMsg rm;
    SharedInvalSnapshotMsg sn;
+   SharedInvalRelSyncMsg rs;
 } SharedInvalidationMessage;
 
 
index 40658ba2ffcbacfba6c7f0490b8e9839d54b0d6c..9b871caef622fbfa903df59a30fee202da26d468 100644 (file)
@@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches;
 
 typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue);
 typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid);
+typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid);
 
 
 extern void AcceptInvalidationMessages(void);
@@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
 
 extern void CacheInvalidateRelcacheByRelid(Oid relid);
 
+extern void CacheInvalidateRelSync(Oid relid);
+
+extern void CacheInvalidateRelSyncAll(void);
+
 extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator);
 
 extern void CacheInvalidateRelmap(Oid databaseId);
@@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid,
 extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
                                          Datum arg);
 
+extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+                                        Datum arg);
+
 extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
 
+extern void CallRelSyncCallbacks(Oid relid);
+
 extern void InvalidateSystemCaches(void);
 extern void InvalidateSystemCachesExtended(bool debug_discard);
 
index 4d3b917ac04ac8c98051b15b402a9d272ee6c614..7d4c2d51c3a587baba31d1b1418e1cfa7598aea4 100644 (file)
@@ -69,6 +69,91 @@ ok( $stderr =~
    "Alter subscription set publication throws warning for non-existent publication"
 );
 
+# Cleanup
+$node_publisher->safe_psql('postgres', qq[
+   DROP PUBLICATION mypub;
+   SELECT pg_drop_replication_slot('mysub');
+]);
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION mysub1");
+
+#
+# Test ALTER PUBLICATION RENAME command during the replication
+#
+
+# Test function for swaping name of publications
+sub test_swap
+{
+   my ($table_name, $pubname, $appname) = @_;
+
+   # Confirms tuples can be replicated
+   $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (1);");
+   $node_publisher->wait_for_catchup($appname);
+   my $result =
+       $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name");
+   is($result, qq(1), 'check replication worked well before renaming a publication');
+
+   # Swap the name of publications; $pubname <-> pub_empty
+   $node_publisher->safe_psql('postgres', qq[
+       ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+       ALTER PUBLICATION pub_empty RENAME TO $pubname;
+       ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+   ]);
+
+   # Insert the data again
+   $node_publisher->safe_psql('postgres', "INSERT INTO $table_name VALUES (2);");
+   $node_publisher->wait_for_catchup($appname);
+
+   # Confirms the second tuple won't be replicated because $pubname does not
+   # contains relations anymore.
+   $result =
+       $node_subscriber->safe_psql('postgres', "SELECT a FROM $table_name ORDER BY a");
+   is($result, qq(1),
+       'check the tuple inserted after the RENAME was not replicated');
+
+   # Restore the name of publications because it can be called several times
+   $node_publisher->safe_psql('postgres', qq[
+       ALTER PUBLICATION $pubname RENAME TO tap_pub_tmp;
+       ALTER PUBLICATION pub_empty RENAME TO $pubname;
+       ALTER PUBLICATION tap_pub_tmp RENAME TO pub_empty;
+   ]);
+}
+
+# Create another table
+$ddl = "CREATE TABLE test2 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Create publications and a subscription
+$node_publisher->safe_psql('postgres', qq[
+   CREATE PUBLICATION pub_empty;
+   CREATE PUBLICATION pub_for_tab FOR TABLE test1;
+   CREATE PUBLICATION pub_for_all_tables FOR ALL TABLES;
+]);
+$node_subscriber->safe_psql('postgres',
+   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION pub_for_tab WITH (copy_data = off)"
+);
+
+# Confirms RENAME command works well for a publication
+test_swap('test1', 'pub_for_tab', 'tap_sub');
+
+# Switches a publication which includes all tables
+$node_subscriber->safe_psql('postgres',
+   "ALTER SUBSCRIPTION tap_sub SET PUBLICATION pub_for_all_tables WITH (refresh = true, copy_data = false);"
+);
+
+# Confirms RENAME command works well for ALL TABLES publication
+test_swap('test2', 'pub_for_all_tables', 'tap_sub');
+
+# Cleanup
+$node_publisher->safe_psql('postgres', qq[
+   DROP PUBLICATION pub_empty, pub_for_tab, pub_for_all_tables;
+   DROP TABLE test1, test2;
+]);
+$node_subscriber->safe_psql('postgres', qq[
+   DROP SUBSCRIPTION tap_sub;
+   DROP TABLE test1, test2;
+]);
+
 $node_subscriber->stop;
 $node_publisher->stop;