Improve invalidation handling in pgoutput.c.
authorAmit Kapila <akapila@postgresql.org>
Fri, 4 Feb 2022 02:00:40 +0000 (07:30 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 4 Feb 2022 02:00:40 +0000 (07:30 +0530)
Fix the following issues in pgoutput.c:

* rel_sync_cache_relation_cb does the wrong thing when called for a cache
flush (i.e., relid == 0). Instead of invalidating all RelationSyncCache
entries as it should, it does nothing.

* When rel_sync_cache_relation_cb does invalidate an entry, it immediately
zaps the entry->map structure, even though that might still be in use. We
instead just mark the entry as invalid and rebuild it at a later safe
point.

* Similarly, rel_sync_cache_publication_cb is way too eager to reset the
pubactions flags, which would likely lead to failing to transmit changes
that we should transmit. In this case also, we just mark the entry as
invalid and rebuild it at a later safe point.

Author: Tom Lane
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/885288.1641420714@sss.pgh.pa.us

src/backend/replication/pgoutput/pgoutput.c

index af8d51aee990dbbfdce859278b41bc5f919fed59..6df705f90ff0c2a2e81f2ad7b0a4d66bdf5465c2 100644 (file)
@@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
 {
        Oid                     relid;                  /* relation oid */
 
+       bool            replicate_valid;        /* overall validity flag for entry */
+
        bool            schema_sent;
        List       *streamed_txns;      /* streamed toplevel transactions with this
                                                                 * schema */
 
-       bool            replicate_valid;
+       /* are we publishing this rel? */
        PublicationActions pubactions;
 
        /*
@@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
 }
 
 /*
- * Publication cache invalidation callback.
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
  */
 static void
 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                                                                                          HASH_ENTER, &found);
        Assert(entry != NULL);
 
-       /* Not found means schema wasn't sent */
+       /* initialize entry, if it's new */
        if (!found)
        {
-               /* immediately make a new entry valid enough to satisfy callbacks */
+               entry->replicate_valid = false;
                entry->schema_sent = false;
                entry->streamed_txns = NIL;
-               entry->replicate_valid = false;
                entry->pubactions.pubinsert = entry->pubactions.pubupdate =
                        entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
                entry->publish_as_relid = InvalidOid;
@@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                {
                        oldctx = MemoryContextSwitchTo(CacheMemoryContext);
                        if (data->publications)
+                       {
                                list_free_deep(data->publications);
-
+                               data->publications = NIL;
+                       }
                        data->publications = LoadPublications(data->publication_names);
                        MemoryContextSwitchTo(oldctx);
                        publications_valid = true;
                }
 
+               /*
+                * Reset schema_sent status as the relation definition may have
+                * changed.  Also reset pubactions to empty in case rel was dropped
+                * from a publication.  Also free any objects that depended on the
+                * earlier definition.
+                */
+               entry->schema_sent = false;
+               list_free(entry->streamed_txns);
+               entry->streamed_txns = NIL;
+               entry->pubactions.pubinsert = false;
+               entry->pubactions.pubupdate = false;
+               entry->pubactions.pubdelete = false;
+               entry->pubactions.pubtruncate = false;
+               if (entry->map)
+               {
+                       /*
+                        * Must free the TupleDescs contained in the map explicitly,
+                        * because free_conversion_map() doesn't.
+                        */
+                       FreeTupleDesc(entry->map->indesc);
+                       FreeTupleDesc(entry->map->outdesc);
+                       free_conversion_map(entry->map);
+               }
+               entry->map = NULL;
+
                /*
                 * Build publication cache. We can't use one provided by relcache as
                 * relcache considers all publications given relation is in, but here
@@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                                        foreach(lc2, ancestors)
                                        {
                                                Oid                     ancestor = lfirst_oid(lc2);
+                                               List       *apubids = GetRelationPublications(ancestor);
+                                               List       *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
 
-                                               if (list_member_oid(GetRelationPublications(ancestor),
-                                                                                       pub->oid) ||
-                                                       list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
-                                                                                       pub->oid))
+                                               if (list_member_oid(apubids, pub->oid) ||
+                                                       list_member_oid(aschemaPubids, pub->oid))
                                                {
                                                        ancestor_published = true;
                                                        if (pub->pubviaroot)
                                                                publish_as_relid = ancestor;
                                                }
+                                               list_free(apubids);
+                                               list_free(aschemaPubids);
                                        }
                                }
 
@@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                }
 
                list_free(pubids);
+               list_free(schemaPubids);
 
                entry->publish_as_relid = publish_as_relid;
                entry->replicate_valid = true;
@@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
        /*
         * Nobody keeps pointers to entries in this hash table around outside
         * logical decoding callback calls - but invalidation events can come in
-        * *during* a callback if we access the relcache in the callback. Because
-        * of that we must mark the cache entry as invalid but not remove it from
-        * the hash while it could still be referenced, then prune it at a later
-        * safe point.
-        *
-        * Getting invalidations for relations that aren't in the table is
-        * entirely normal, since there's no way to unregister for an invalidation
-        * event. So we don't care if it's found or not.
+        * *during* a callback if we do any syscache access in the callback.
+        * Because of that we must mark the cache entry as invalid but not damage
+        * any of its substructure here.  The next get_rel_sync_entry() call will
+        * rebuild it all.
         */
-       entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
-                                                                                         HASH_FIND, NULL);
-
-       /*
-        * Reset schema sent status as the relation definition may have changed.
-        * Also free any objects that depended on the earlier definition.
-        */
-       if (entry != NULL)
+       if (OidIsValid(relid))
        {
-               entry->schema_sent = false;
-               list_free(entry->streamed_txns);
-               entry->streamed_txns = NIL;
-               if (entry->map)
+               /*
+                * Getting invalidations for relations that aren't in the table is
+                * entirely normal.  So we don't care if it's found or not.
+                */
+               entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+                                                                                                 HASH_FIND, NULL);
+               if (entry != NULL)
+                       entry->replicate_valid = false;
+       }
+       else
+       {
+               /* Whole cache must be flushed. */
+               HASH_SEQ_STATUS status;
+
+               hash_seq_init(&status, RelationSyncCache);
+               while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
                {
-                       /*
-                        * Must free the TupleDescs contained in the map explicitly,
-                        * because free_conversion_map() doesn't.
-                        */
-                       FreeTupleDesc(entry->map->indesc);
-                       FreeTupleDesc(entry->map->outdesc);
-                       free_conversion_map(entry->map);
+                       entry->replicate_valid = false;
                }
-               entry->map = NULL;
        }
 }
 
 /*
  * Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel, and
+ * pg_publication_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
        while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
        {
                entry->replicate_valid = false;
-
-               /*
-                * There might be some relations dropped from the publication so we
-                * don't need to publish the changes for them.
-                */
-               entry->pubactions.pubinsert = false;
-               entry->pubactions.pubupdate = false;
-               entry->pubactions.pubdelete = false;
-               entry->pubactions.pubtruncate = false;
        }
 }