{
Oid relid; /* relation oid */
+ bool replicate_valid; /* overall validity flag for entry */
+
bool schema_sent;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
- bool replicate_valid;
+ /* are we publishing this rel? */
PublicationActions pubactions;
/*
}
/*
- * Publication cache invalidation callback.
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
*/
static void
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
HASH_ENTER, &found);
Assert(entry != NULL);
- /* Not found means schema wasn't sent */
+ /* initialize entry, if it's new */
if (!found)
{
- /* immediately make a new entry valid enough to satisfy callbacks */
+ entry->replicate_valid = false;
entry->schema_sent = false;
entry->streamed_txns = NIL;
- entry->replicate_valid = false;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->publish_as_relid = InvalidOid;
{
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
if (data->publications)
+ {
list_free_deep(data->publications);
-
+ data->publications = NIL;
+ }
data->publications = LoadPublications(data->publication_names);
MemoryContextSwitchTo(oldctx);
publications_valid = true;
}
+ /*
+ * Reset schema_sent status as the relation definition may have
+ * changed. Also reset pubactions to empty in case rel was dropped
+ * from a publication. Also free any objects that depended on the
+ * earlier definition.
+ */
+ entry->schema_sent = false;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+ if (entry->map)
+ {
+ /*
+ * Must free the TupleDescs contained in the map explicitly,
+ * because free_conversion_map() doesn't.
+ */
+ FreeTupleDesc(entry->map->indesc);
+ FreeTupleDesc(entry->map->outdesc);
+ free_conversion_map(entry->map);
+ }
+ entry->map = NULL;
+
/*
* Build publication cache. We can't use one provided by relcache as
* relcache considers all publications given relation is in, but here
foreach(lc2, ancestors)
{
Oid ancestor = lfirst_oid(lc2);
+ List *apubids = GetRelationPublications(ancestor);
+ List *aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
- if (list_member_oid(GetRelationPublications(ancestor),
- pub->oid) ||
- list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)),
- pub->oid))
+ if (list_member_oid(apubids, pub->oid) ||
+ list_member_oid(aschemaPubids, pub->oid))
{
ancestor_published = true;
if (pub->pubviaroot)
publish_as_relid = ancestor;
}
+ list_free(apubids);
+ list_free(aschemaPubids);
}
}
}
list_free(pubids);
+ list_free(schemaPubids);
entry->publish_as_relid = publish_as_relid;
entry->replicate_valid = true;
/*
* Nobody keeps pointers to entries in this hash table around outside
* logical decoding callback calls - but invalidation events can come in
- * *during* a callback if we access the relcache in the callback. Because
- * of that we must mark the cache entry as invalid but not remove it from
- * the hash while it could still be referenced, then prune it at a later
- * safe point.
- *
- * Getting invalidations for relations that aren't in the table is
- * entirely normal, since there's no way to unregister for an invalidation
- * event. So we don't care if it's found or not.
+ * *during* a callback if we do any syscache access in the callback.
+ * Because of that we must mark the cache entry as invalid but not damage
+ * any of its substructure here. The next get_rel_sync_entry() call will
+ * rebuild it all.
*/
- entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
- HASH_FIND, NULL);
-
- /*
- * Reset schema sent status as the relation definition may have changed.
- * Also free any objects that depended on the earlier definition.
- */
- if (entry != NULL)
+ if (OidIsValid(relid))
{
- entry->schema_sent = false;
- list_free(entry->streamed_txns);
- entry->streamed_txns = NIL;
- if (entry->map)
+ /*
+ * Getting invalidations for relations that aren't in the table is
+ * entirely normal. So we don't care if it's found or not.
+ */
+ entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+ HASH_FIND, NULL);
+ if (entry != NULL)
+ entry->replicate_valid = false;
+ }
+ else
+ {
+ /* Whole cache must be flushed. */
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, RelationSyncCache);
+ while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
- /*
- * Must free the TupleDescs contained in the map explicitly,
- * because free_conversion_map() doesn't.
- */
- FreeTupleDesc(entry->map->indesc);
- FreeTupleDesc(entry->map->outdesc);
- free_conversion_map(entry->map);
+ entry->replicate_valid = false;
}
- entry->map = NULL;
}
}
/*
* Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel, and
+ * pg_publication_namespace.
*/
static void
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
{
entry->replicate_valid = false;
-
- /*
- * There might be some relations dropped from the publication so we
- * don't need to publish the changes for them.
- */
- entry->pubactions.pubinsert = false;
- entry->pubactions.pubupdate = false;
- entry->pubactions.pubdelete = false;
- entry->pubactions.pubtruncate = false;
}
}