diff options
| author | Amit Kapila | 2022-02-22 02:24:12 +0000 |
|---|---|---|
| committer | Amit Kapila | 2022-02-22 02:41:50 +0000 |
| commit | 52e4f0cd472d39d07732b99559989ea3b615be78 (patch) | |
| tree | e40cc7b7690f82c7cfb945fd55afdf55e9bc944f /src/include | |
| parent | ebf6c5249b7db525e59563fb149642665c88f747 (diff) | |
Allow specifying row filters for logical replication of tables.
This feature adds row filtering for publication tables. When a publication
is defined or modified, an optional WHERE clause can be specified. Rows
that don't satisfy this WHERE clause will be filtered out. This allows a
set of tables to be partially replicated. The row filter is per table. A
new row filter can be added simply by specifying a WHERE clause after the
table name. The WHERE clause must be enclosed by parentheses.
The row filter WHERE clause for a table added to a publication that
publishes UPDATE and/or DELETE operations must contain only columns that
are covered by REPLICA IDENTITY. The row filter WHERE clause for a table
added to a publication that publishes INSERT can use any column. If the
row filter evaluates to NULL, it is regarded as "false". The WHERE clause
only allows simple expressions that don't have user-defined functions,
user-defined operators, user-defined types, user-defined collations,
non-immutable built-in functions, or references to system columns. These
restrictions could be addressed in the future.
If you choose to do the initial table synchronization, only data that
satisfies the row filters is copied to the subscriber. If the subscription
has several publications in which a table has been published with
different WHERE clauses, rows that satisfy ANY of the expressions will be
copied. If a subscriber is a pre-15 version, the initial table
synchronization won't use row filters even if they are defined in the
publisher.
The row filters are applied before publishing the changes. If the
subscription has several publications in which the same table has been
published with different filters (for the same publish operation), those
expressions get OR'ed together so that rows satisfying any of the
expressions will be replicated.
This means all the other filters become redundant if (a) one of the
publications have no filter at all, (b) one of the publications was
created using FOR ALL TABLES, (c) one of the publications was created
using FOR ALL TABLES IN SCHEMA and the table belongs to that same schema.
If your publication contains a partitioned table, the publication
parameter publish_via_partition_root determines if it uses the partition's
row filter (if the parameter is false, the default) or the root
partitioned table's row filter.
Psql commands \dRp+ and \d <table-name> will display any row filters.
Author: Hou Zhijie, Euler Taveira, Peter Smith, Ajin Cherian
Reviewed-by: Greg Nancarrow, Haiying Tang, Amit Kapila, Tomas Vondra, Dilip Kumar, Vignesh C, Alvaro Herrera, Andres Freund, Wei Wang
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
| -rw-r--r-- | src/include/catalog/pg_publication.h | 18 | ||||
| -rw-r--r-- | src/include/catalog/pg_publication_rel.h | 6 | ||||
| -rw-r--r-- | src/include/commands/publicationcmds.h | 2 | ||||
| -rw-r--r-- | src/include/nodes/parsenodes.h | 1 | ||||
| -rw-r--r-- | src/include/replication/logicalproto.h | 11 | ||||
| -rw-r--r-- | src/include/replication/pgoutput.h | 1 | ||||
| -rw-r--r-- | src/include/replication/reorderbuffer.h | 6 | ||||
| -rw-r--r-- | src/include/utils/rel.h | 2 | ||||
| -rw-r--r-- | src/include/utils/relcache.h | 5 |
10 files changed, 41 insertions, 13 deletions
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index b940a0cf0c..1addb568ef 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202202141 +#define CATALOG_VERSION_NO 202202221 #endif diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 841b9b6c25..ba72e62e61 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -74,6 +74,19 @@ typedef struct PublicationActions bool pubtruncate; } PublicationActions; +typedef struct PublicationDesc +{ + PublicationActions pubactions; + + /* + * true if the columns referenced in row filters which are used for UPDATE + * or DELETE are part of the replica identity or the publication actions + * do not include UPDATE or DELETE. + */ + bool rf_valid_for_update; + bool rf_valid_for_delete; +} PublicationDesc; + typedef struct Publication { Oid oid; @@ -86,6 +99,7 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + Node *whereClause; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -120,10 +134,11 @@ extern List *GetAllSchemaPublicationRelations(Oid puboid, extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); +extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors); extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); -extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, +extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists); @@ -131,5 +146,4 @@ extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); - #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 117a1d67e5..0dd0f425db 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + pg_node_tree prqual; /* qualifications */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8287, 8288); + DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); DECLARE_INDEX(pg_publication_rel_prpubid_index, 6116, PublicationRelPrpubidIndexId, on pg_publication_rel using btree(prpubid oid_ops)); diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index cec7525826..7813cbcb6b 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -31,5 +31,7 @@ extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); +extern bool contain_invalid_rfcolumn(Oid pubid, Relation relation, + List *ancestors, bool pubviaroot); #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 34218b718c..1617702d9d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3651,6 +3651,7 @@ typedef struct PublicationTable { NodeTag type; RangeVar *relation; /* relation to be published */ + Node *whereClause; /* qualifications */ } PublicationTable; /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 22fffaca62..4d2c881644 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -14,6 +14,7 @@ #define LOGICAL_PROTO_H #include "access/xact.h" +#include "executor/tuptable.h" #include "replication/reorderbuffer.h" #include "utils/rel.h" @@ -206,17 +207,19 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, - Relation rel, HeapTuple newtuple, + Relation rel, + TupleTableSlot *newslot, bool binary); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, - Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + Relation rel, + TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, TransactionId xid, - Relation rel, HeapTuple oldtuple, + Relation rel, TupleTableSlot *oldtuple, bool binary); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 78aa9151ef..eafedd610a 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -19,6 +19,7 @@ typedef struct PGOutputData { MemoryContext context; /* private memory context for transient * allocations */ + MemoryContext cachectx; /* private memory context for cache data */ /* client-supplied info: */ uint32 protocol_version; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 859424bbd9..0bcc150b33 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -51,7 +51,7 @@ typedef struct ReorderBufferTupleBuf * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of * logical decoding don't have to care about these. */ -enum ReorderBufferChangeType +typedef enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, @@ -66,7 +66,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_SEQUENCE -}; +} ReorderBufferChangeType; /* forward declaration */ struct ReorderBufferTXN; @@ -83,7 +83,7 @@ typedef struct ReorderBufferChange XLogRecPtr lsn; /* The type of change. */ - enum ReorderBufferChangeType action; + ReorderBufferChangeType action; /* Transaction this change belongs to. */ struct ReorderBufferTXN *txn; diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 6da1b220cd..3b4ab65ae2 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -161,7 +161,7 @@ typedef struct RelationData Bitmapset *rd_idattr; /* included in replica identity index */ Bitmapset *rd_hotblockingattr; /* cols blocking HOT update */ - PublicationActions *rd_pubactions; /* publication actions */ + PublicationDesc *rd_pubdesc; /* publication descriptor, or NULL */ /* * rd_options is set whenever rd_rel is loaded into the relcache entry. diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index 84d6afef19..2281a7dc53 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -74,8 +74,9 @@ extern void RelationGetExclusionInfo(Relation indexRelation, extern void RelationInitIndexAccessInfo(Relation relation); /* caller must include pg_publication.h */ -struct PublicationActions; -extern struct PublicationActions *GetRelationPublicationActions(Relation relation); +struct PublicationDesc; +extern void RelationBuildPublicationDesc(Relation relation, + struct PublicationDesc *pubdesc); extern void RelationInitTableAccessMethod(Relation relation); |
