Allow specifying row filters for logical replication of tables.
authorAmit Kapila <akapila@postgresql.org>
Tue, 22 Feb 2022 02:24:12 +0000 (07:54 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 22 Feb 2022 02:41:50 +0000 (08:11 +0530)
This feature adds row filtering for publication tables. When a publication
is defined or modified, an optional WHERE clause can be specified. Rows
that don't satisfy this WHERE clause will be filtered out. This allows a
set of tables to be partially replicated. The row filter is per table. A
new row filter can be added simply by specifying a WHERE clause after the
table name. The WHERE clause must be enclosed by parentheses.

The row filter WHERE clause for a table added to a publication that
publishes UPDATE and/or DELETE operations must contain only columns that
are covered by REPLICA IDENTITY. The row filter WHERE clause for a table
added to a publication that publishes INSERT can use any column. If the
row filter evaluates to NULL, it is regarded as "false". The WHERE clause
only allows simple expressions that don't have user-defined functions,
user-defined operators, user-defined types, user-defined collations,
non-immutable built-in functions, or references to system columns. These
restrictions could be addressed in the future.

If you choose to do the initial table synchronization, only data that
satisfies the row filters is copied to the subscriber. If the subscription
has several publications in which a table has been published with
different WHERE clauses, rows that satisfy ANY of the expressions will be
copied. If a subscriber is a pre-15 version, the initial table
synchronization won't use row filters even if they are defined in the
publisher.

The row filters are applied before publishing the changes. If the
subscription has several publications in which the same table has been
published with different filters (for the same publish operation), those
expressions get OR'ed together so that rows satisfying any of the
expressions will be replicated.

This means all the other filters become redundant if (a) one of the
publications have no filter at all, (b) one of the publications was
created using FOR ALL TABLES, (c) one of the publications was created
using FOR ALL TABLES IN SCHEMA and the table belongs to that same schema.

If your publication contains a partitioned table, the publication
parameter publish_via_partition_root determines if it uses the partition's
row filter (if the parameter is false, the default) or the root
partitioned table's row filter.

Psql commands \dRp+ and \d <table-name> will display any row filters.

Author: Hou Zhijie, Euler Taveira, Peter Smith, Ajin Cherian
Reviewed-by: Greg Nancarrow, Haiying Tang, Amit Kapila, Tomas Vondra, Dilip Kumar, Vignesh C, Alvaro Herrera, Andres Freund, Wei Wang
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com

33 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/ref/alter_publication.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_publication.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/executor/execReplication.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/parser/gram.y
src/backend/replication/logical/proto.c
src/backend/replication/logical/tablesync.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/utils/cache/relcache.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/pg_publication.h
src/include/catalog/pg_publication_rel.h
src/include/commands/publicationcmds.h
src/include/nodes/parsenodes.h
src/include/replication/logicalproto.h
src/include/replication/pgoutput.h
src/include/replication/reorderbuffer.h
src/include/utils/rel.h
src/include/utils/relcache.h
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql
src/test/subscription/t/028_row_filter.pl [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index 5a1627a3941d5ef85ae35dad64f329ede8fb7b1f..83987a9904578b47eca3ee7ae7d225c0aa999778 100644 (file)
@@ -6325,6 +6325,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        Reference to relation
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+      <structfield>prqual</structfield> <type>pg_node_tree</type>
+      </para>
+      <para>Expression tree (in <function>nodeToString()</function>
+      representation) for the relation's publication qualifying condition. Null
+      if there is no publication qualifying condition.</para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 7c7c27bf7ce9f44795d1ac85e7c615418f3bdee2..32b75f6c78e59e4b838dcce8d853b0a52b4fc09d 100644 (file)
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -52,7 +52,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    remove one or more tables/schemas from the publication.  Note that adding
    tables/schemas to a publication that is already subscribed to will require an
    <literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the
-   subscribing side in order to become effective.
+   subscribing side in order to become effective. Note also that the combination
+   of <literal>DROP</literal> with a <literal>WHERE</literal> clause is not
+   allowed.
   </para>
 
   <para>
@@ -110,6 +112,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      If the optional <literal>WHERE</literal> clause is specified, rows for
+      which the <replaceable class="parameter">expression</replaceable>
+      evaluates to false or null will not be published. Note that parentheses
+      are required around the expression. The
+      <replaceable class="parameter">expression</replaceable> is evaluated with
+      the role used for the replication connection.
      </para>
     </listitem>
    </varlistentry>
index 0b027cc34622f89619a5ea569672219a1988b9fa..0d6f064f58d74ef0ef31c36782b68ca3bca7bb55 100644 (file)
@@ -163,8 +163,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
          <para>
           Specifies whether to copy pre-existing data in the publications
           that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.  (Previously-subscribed
-          tables are not copied.)
+          The default is <literal>true</literal>.
+         </para>
+         <para>
+          Previously subscribed tables are not copied, even if a table's row
+          filter <literal>WHERE</literal> clause has since been modified.
          </para>
         </listitem>
        </varlistentry>
index 385975bfaddd0b1f5f7d51bda75eaaf9828a5c70..4979b9b646d0127fe89ac30725e19a48289c2193 100644 (file)
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,14 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      If the optional <literal>WHERE</literal> clause is specified, rows for
+      which the <replaceable class="parameter">expression</replaceable>
+      evaluates to false or null will not be published. Note that parentheses
+      are required around the expression. It has no effect on
+      <literal>TRUNCATE</literal> commands.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
@@ -225,6 +233,22 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
    disallowed on those tables.
   </para>
 
+  <para>
+   A <literal>WHERE</literal> (i.e. row filter) expression must contain only
+   columns that are covered by the <literal>REPLICA IDENTITY</literal>, in
+   order for <command>UPDATE</command> and <command>DELETE</command> operations
+   to be published. For publication of <command>INSERT</command> operations,
+   any column may be used in the <literal>WHERE</literal> expression. The
+   <literal>WHERE</literal> clause allows simple expressions that don't have
+   user-defined functions, user-defined operators, user-defined types,
+   user-defined collations, non-immutable built-in functions, or references to
+   system columns.
+   If your publication contains a partitioned table, the publication parameter
+   <literal>publish_via_partition_root</literal> determines if it uses the
+   partition's row filter (if the parameter is false, the default) or the root
+   partitioned table's row filter.
+  </para>
+
   <para>
    For an <command>INSERT ... ON CONFLICT</command> command, the publication will
    publish the operation that actually results from the command.  So depending
@@ -247,6 +271,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   <para>
    <acronym>DDL</acronym> operations are not published.
   </para>
+
+  <para>
+   The <literal>WHERE</literal> clause expression is executed with the role used
+   for the replication connection.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -259,6 +288,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
 </programlisting>
   </para>
 
+  <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
   <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
index 990a41f1a1ba716564f4152ce3a609a333d329ba..e80a2617a34fb7d1216cabc3ebb7dedb156ee815 100644 (file)
@@ -208,6 +208,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           that are being subscribed to when the replication starts.
           The default is <literal>true</literal>.
          </para>
+         <para>
+          If the publications contain <literal>WHERE</literal> clauses, it
+          will affect what data is copied. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -293,7 +298,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
   </variablelist>
  </refsect1>
 
- <refsect1>
+ <refsect1 id="sql-createsubscription-notes" xreflabel="Notes">
   <title>Notes</title>
 
   <para>
@@ -319,6 +324,26 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    the parameter <literal>create_slot = false</literal>.  This is an
    implementation restriction that might be lifted in a future release.
   </para>
+
+  <para>
+   If any table in the publication has a <literal>WHERE</literal> clause, rows
+   for which the <replaceable class="parameter">expression</replaceable>
+   evaluates to false or null will not be published. If the subscription has
+   several publications in which the same table has been published with
+   different <literal>WHERE</literal> clauses, a row will be published if any
+   of the expressions (referring to that publish operation) are satisfied. In
+   the case of different <literal>WHERE</literal> clauses, if one of the
+   publications has no <literal>WHERE</literal> clause (referring to that
+   publish operation) or the publication is declared as
+   <literal>FOR ALL TABLES</literal> or
+   <literal>FOR ALL TABLES IN SCHEMA</literal>, rows are always published
+   regardless of the definition of the other expressions.
+   If the subscriber is a <productname>PostgreSQL</productname> version before
+   15 then any row filtering is ignored during the initial data synchronization
+   phase. For this case, the user might want to consider deleting any initially
+   copied data that would be incompatible with subsequent filtering.
+  </para>
+
  </refsect1>
 
  <refsect1>
index e14ca2f56305c699e0ec8f5d194ba17a9c57127a..25998fbb39b102a64f903434ded0d9caed4eb09b 100644 (file)
@@ -275,18 +275,57 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
        return result;
 }
 
+/*
+ * Returns the relid of the topmost ancestor that is published via this
+ * publication if any, otherwise returns InvalidOid.
+ *
+ * Note that the list of ancestors should be ordered such that the topmost
+ * ancestor is at the end of the list.
+ */
+Oid
+GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
+{
+       ListCell   *lc;
+       Oid                     topmost_relid = InvalidOid;
+
+       /*
+        * Find the "topmost" ancestor that is in this publication.
+        */
+       foreach(lc, ancestors)
+       {
+               Oid                     ancestor = lfirst_oid(lc);
+               List       *apubids = GetRelationPublications(ancestor);
+               List       *aschemaPubids = NIL;
+
+               if (list_member_oid(apubids, puboid))
+                       topmost_relid = ancestor;
+               else
+               {
+                       aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
+                       if (list_member_oid(aschemaPubids, puboid))
+                               topmost_relid = ancestor;
+               }
+
+               list_free(apubids);
+               list_free(aschemaPubids);
+       }
+
+       return topmost_relid;
+}
+
 /*
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+publication_add_relation(Oid pubid, PublicationRelInfo *pri,
                                                 bool if_not_exists)
 {
        Relation        rel;
        HeapTuple       tup;
        Datum           values[Natts_pg_publication_rel];
        bool            nulls[Natts_pg_publication_rel];
-       Oid                     relid = RelationGetRelid(targetrel->relation);
+       Relation        targetrel = pri->relation;
+       Oid                     relid = RelationGetRelid(targetrel);
        Oid                     pubreloid;
        Publication *pub = GetPublication(pubid);
        ObjectAddress myself,
@@ -311,10 +350,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
                ereport(ERROR,
                                (errcode(ERRCODE_DUPLICATE_OBJECT),
                                 errmsg("relation \"%s\" is already member of publication \"%s\"",
-                                               RelationGetRelationName(targetrel->relation), pub->name)));
+                                               RelationGetRelationName(targetrel), pub->name)));
        }
 
-       check_publication_add_relation(targetrel->relation);
+       check_publication_add_relation(targetrel);
 
        /* Form a tuple. */
        memset(values, 0, sizeof(values));
@@ -328,6 +367,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
        values[Anum_pg_publication_rel_prrelid - 1] =
                ObjectIdGetDatum(relid);
 
+       /* Add qualifications, if available */
+       if (pri->whereClause != NULL)
+               values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
+       else
+               nulls[Anum_pg_publication_rel_prqual - 1] = true;
+
        tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
        /* Insert tuple into catalog. */
@@ -345,6 +390,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
        ObjectAddressSet(referenced, RelationRelationId, relid);
        recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+       /* Add dependency on the objects mentioned in the qualifications */
+       if (pri->whereClause)
+               recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
+                                                                               DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
+                                                                               false);
+
        /* Close the table. */
        table_close(rel, RowExclusiveLock);
 
index 0e4bb97fb73e0af653fd3a29f6293e9f6fcab46a..16b8661a1b7f183abb87b672768548830218206c 100644 (file)
@@ -26,6 +26,7 @@
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_namespace.h"
 #include "catalog/pg_publication_rel.h"
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
+/*
+ * Information used to validate the columns in the row filter expression. See
+ * contain_invalid_rfcolumn_walker for details.
+ */
+typedef struct rf_context
+{
+       Bitmapset  *bms_replident;      /* bitset of replica identity columns */
+       bool            pubviaroot;             /* true if we are validating the parent
+                                                                * relation's row filter */
+       Oid                     relid;                  /* relid of the relation */
+       Oid                     parentid;               /* relid of the parent relation */
+} rf_context;
+
 static List *OpenRelIdList(List *relids);
 static List *OpenTableList(List *tables);
 static void CloseTableList(List *rels);
@@ -234,6 +252,362 @@ CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist,
        }
 }
 
+/*
+ * Returns true if any of the columns used in the row filter WHERE expression is
+ * not part of REPLICA IDENTITY, false otherwise.
+ */
+static bool
+contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
+{
+       if (node == NULL)
+               return false;
+
+       if (IsA(node, Var))
+       {
+               Var                *var = (Var *) node;
+               AttrNumber      attnum = var->varattno;
+
+               /*
+                * If pubviaroot is true, we are validating the row filter of the
+                * parent table, but the bitmap contains the replica identity
+                * information of the child table. So, get the column number of the
+                * child table as parent and child column order could be different.
+                */
+               if (context->pubviaroot)
+               {
+                       char       *colname = get_attname(context->parentid, attnum, false);
+
+                       attnum = get_attnum(context->relid, colname);
+               }
+
+               if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
+                                                  context->bms_replident))
+                       return true;
+       }
+
+       return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
+                                                                 (void *) context);
+}
+
+/*
+ * Check if all columns referenced in the filter expression are part of the
+ * REPLICA IDENTITY index or not.
+ *
+ * Returns true if any invalid column is found.
+ */
+bool
+contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
+                                                bool pubviaroot)
+{
+       HeapTuple       rftuple;
+       Oid                     relid = RelationGetRelid(relation);
+       Oid                     publish_as_relid = RelationGetRelid(relation);
+       bool            result = false;
+       Datum           rfdatum;
+       bool            rfisnull;
+
+       /*
+        * FULL means all columns are in the REPLICA IDENTITY, so all columns are
+        * allowed in the row filter and we can skip the validation.
+        */
+       if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+               return false;
+
+       /*
+        * For a partition, if pubviaroot is true, find the topmost ancestor that
+        * is published via this publication as we need to use its row filter
+        * expression to filter the partition's changes.
+        *
+        * Note that even though the row filter used is for an ancestor, the
+        * REPLICA IDENTITY used will be for the actual child table.
+        */
+       if (pubviaroot && relation->rd_rel->relispartition)
+       {
+               publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
+
+               if (!OidIsValid(publish_as_relid))
+                       publish_as_relid = relid;
+       }
+
+       rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+                                                         ObjectIdGetDatum(publish_as_relid),
+                                                         ObjectIdGetDatum(pubid));
+
+       if (!HeapTupleIsValid(rftuple))
+               return false;
+
+       rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+                                                         Anum_pg_publication_rel_prqual,
+                                                         &rfisnull);
+
+       if (!rfisnull)
+       {
+               rf_context      context = {0};
+               Node       *rfnode;
+               Bitmapset  *bms = NULL;
+
+               context.pubviaroot = pubviaroot;
+               context.parentid = publish_as_relid;
+               context.relid = relid;
+
+               /* Remember columns that are part of the REPLICA IDENTITY */
+               bms = RelationGetIndexAttrBitmap(relation,
+                                                                                INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+               context.bms_replident = bms;
+               rfnode = stringToNode(TextDatumGetCString(rfdatum));
+               result = contain_invalid_rfcolumn_walker(rfnode, &context);
+
+               bms_free(bms);
+               pfree(rfnode);
+       }
+
+       ReleaseSysCache(rftuple);
+
+       return result;
+}
+
+/* check_functions_in_node callback */
+static bool
+contain_mutable_or_user_functions_checker(Oid func_id, void *context)
+{
+       return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
+                       func_id >= FirstNormalObjectId);
+}
+
+/*
+ * Check if the node contains any unallowed object. See
+ * check_simple_rowfilter_expr_walker.
+ *
+ * Returns the error detail message in errdetail_msg for unallowed expressions.
+ */
+static void
+expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg)
+{
+       if (IsA(node, List))
+       {
+               /*
+                * OK, we don't need to perform other expr checks for List nodes
+                * because those are undefined for List.
+                */
+               return;
+       }
+
+       if (exprType(node) >= FirstNormalObjectId)
+               *errdetail_msg = _("User-defined types are not allowed.");
+       else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
+                                                                        (void *) pstate))
+               *errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
+       else if (exprCollation(node) >= FirstNormalObjectId ||
+                        exprInputCollation(node) >= FirstNormalObjectId)
+               *errdetail_msg = _("User-defined collations are not allowed.");
+}
+
+/*
+ * The row filter walker checks if the row filter expression is a "simple
+ * expression".
+ *
+ * It allows only simple or compound expressions such as:
+ * - (Var Op Const)
+ * - (Var Op Var)
+ * - (Var Op Const) AND/OR (Var Op Const)
+ * - etc
+ * (where Var is a column of the table this filter belongs to)
+ *
+ * The simple expression has the following restrictions:
+ * - User-defined operators are not allowed;
+ * - User-defined functions are not allowed;
+ * - User-defined types are not allowed;
+ * - User-defined collations are not allowed;
+ * - Non-immutable built-in functions are not allowed;
+ * - System columns are not allowed.
+ *
+ * NOTES
+ *
+ * We don't allow user-defined functions/operators/types/collations because
+ * (a) if a user drops a user-defined object used in a row filter expression or
+ * if there is any other error while using it, the logical decoding
+ * infrastructure won't be able to recover from such an error even if the
+ * object is recreated again because a historic snapshot is used to evaluate
+ * the row filter;
+ * (b) a user-defined function can be used to access tables that could have
+ * unpleasant results because a historic snapshot is used. That's why only
+ * immutable built-in functions are allowed in row filter expressions.
+ *
+ * We don't allow system columns because currently, we don't have that
+ * information in the tuple passed to downstream. Also, as we don't replicate
+ * those to subscribers, there doesn't seem to be a need for a filter on those
+ * columns.
+ *
+ * We can allow other node types after more analysis and testing.
+ */
+static bool
+check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
+{
+       char       *errdetail_msg = NULL;
+
+       if (node == NULL)
+               return false;
+
+       switch (nodeTag(node))
+       {
+               case T_Var:
+                       /* System columns are not allowed. */
+                       if (((Var *) node)->varattno < InvalidAttrNumber)
+                               errdetail_msg = _("System columns are not allowed.");
+                       break;
+               case T_OpExpr:
+               case T_DistinctExpr:
+               case T_NullIfExpr:
+                       /* OK, except user-defined operators are not allowed. */
+                       if (((OpExpr *) node)->opno >= FirstNormalObjectId)
+                               errdetail_msg = _("User-defined operators are not allowed.");
+                       break;
+               case T_ScalarArrayOpExpr:
+                       /* OK, except user-defined operators are not allowed. */
+                       if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
+                               errdetail_msg = _("User-defined operators are not allowed.");
+
+                       /*
+                        * We don't need to check the hashfuncid and negfuncid of
+                        * ScalarArrayOpExpr as those functions are only built for a
+                        * subquery.
+                        */
+                       break;
+               case T_RowCompareExpr:
+                       {
+                               ListCell   *opid;
+
+                               /* OK, except user-defined operators are not allowed. */
+                               foreach(opid, ((RowCompareExpr *) node)->opnos)
+                               {
+                                       if (lfirst_oid(opid) >= FirstNormalObjectId)
+                                       {
+                                               errdetail_msg = _("User-defined operators are not allowed.");
+                                               break;
+                                       }
+                               }
+                       }
+                       break;
+               case T_Const:
+               case T_FuncExpr:
+               case T_BoolExpr:
+               case T_RelabelType:
+               case T_CollateExpr:
+               case T_CaseExpr:
+               case T_CaseTestExpr:
+               case T_ArrayExpr:
+               case T_RowExpr:
+               case T_CoalesceExpr:
+               case T_MinMaxExpr:
+               case T_XmlExpr:
+               case T_NullTest:
+               case T_BooleanTest:
+               case T_List:
+                       /* OK, supported */
+                       break;
+               default:
+                       errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions.");
+                       break;
+       }
+
+       /*
+        * For all the supported nodes, check the types, functions, and collations
+        * used in the nodes.
+        */
+       if (!errdetail_msg)
+               expr_allowed_in_node(node, pstate, &errdetail_msg);
+
+       if (errdetail_msg)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("invalid publication WHERE expression"),
+                                errdetail("%s", errdetail_msg),
+                                parser_errposition(pstate, exprLocation(node))));
+
+       return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
+                                                                 (void *) pstate);
+}
+
+/*
+ * Check if the row filter expression is a "simple expression".
+ *
+ * See check_simple_rowfilter_expr_walker for details.
+ */
+static bool
+check_simple_rowfilter_expr(Node *node, ParseState *pstate)
+{
+       return check_simple_rowfilter_expr_walker(node, pstate);
+}
+
+/*
+ * Transform the publication WHERE expression for all the relations in the list,
+ * ensuring it is coerced to boolean and necessary collation information is
+ * added if required, and add a new nsitem/RTE for the associated relation to
+ * the ParseState's namespace list.
+ *
+ * Also check the publication row filter expression and throw an error if
+ * anything not permitted or unexpected is encountered.
+ */
+static void
+TransformPubWhereClauses(List *tables, const char *queryString,
+                                                bool pubviaroot)
+{
+       ListCell   *lc;
+
+       foreach(lc, tables)
+       {
+               ParseNamespaceItem *nsitem;
+               Node       *whereclause = NULL;
+               ParseState *pstate;
+               PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
+
+               if (pri->whereClause == NULL)
+                       continue;
+
+               /*
+                * If the publication doesn't publish changes via the root partitioned
+                * table, the partition's row filter will be used. So disallow using
+                * WHERE clause on partitioned table in this case.
+                */
+               if (!pubviaroot &&
+                       pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("cannot use publication WHERE clause for relation \"%s\"",
+                                                       RelationGetRelationName(pri->relation)),
+                                        errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
+                                                          "publish_via_partition_root")));
+
+               pstate = make_parsestate(NULL);
+               pstate->p_sourcetext = queryString;
+
+               nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
+                                                                                          AccessShareLock, NULL,
+                                                                                          false, false);
+
+               addNSItemToQuery(pstate, nsitem, false, true, true);
+
+               whereclause = transformWhereClause(pstate,
+                                                                                  copyObject(pri->whereClause),
+                                                                                  EXPR_KIND_WHERE,
+                                                                                  "PUBLICATION WHERE");
+
+               /* Fix up collation information */
+               assign_expr_collations(pstate, whereclause);
+
+               /*
+                * We allow only simple expressions in row filters. See
+                * check_simple_rowfilter_expr_walker.
+                */
+               check_simple_rowfilter_expr(whereclause, pstate);
+
+               free_parsestate(pstate);
+
+               pri->whereClause = whereclause;
+       }
+}
+
 /*
  * Create new publication.
  */
@@ -346,6 +720,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
                        rels = OpenTableList(relations);
                        CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
                                                                                                  PUBLICATIONOBJ_TABLE);
+
+                       TransformPubWhereClauses(rels, pstate->p_sourcetext,
+                                                                        publish_via_partition_root);
+
                        PublicationAddTables(puboid, rels, true, NULL);
                        CloseTableList(rels);
                }
@@ -392,6 +770,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
        bool            publish_via_partition_root;
        ObjectAddress obj;
        Form_pg_publication pubform;
+       List       *root_relids = NIL;
+       ListCell   *lc;
 
        parse_publication_options(pstate,
                                                          stmt->options,
@@ -399,6 +779,65 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
                                                          &publish_via_partition_root_given,
                                                          &publish_via_partition_root);
 
+       pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+       /*
+        * If the publication doesn't publish changes via the root partitioned
+        * table, the partition's row filter will be used. So disallow using WHERE
+        * clause on partitioned table in this case.
+        */
+       if (!pubform->puballtables && publish_via_partition_root_given &&
+               !publish_via_partition_root)
+       {
+               /*
+                * Lock the publication so nobody else can do anything with it. This
+                * prevents concurrent alter to add partitioned table(s) with WHERE
+                * clause(s) which we don't allow when not publishing via root.
+                */
+               LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+                                                  AccessShareLock);
+
+               root_relids = GetPublicationRelations(pubform->oid,
+                                                                                         PUBLICATION_PART_ROOT);
+
+               foreach(lc, root_relids)
+               {
+                       HeapTuple       rftuple;
+                       Oid                     relid = lfirst_oid(lc);
+
+                       rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+                                                                         ObjectIdGetDatum(relid),
+                                                                         ObjectIdGetDatum(pubform->oid));
+
+                       if (HeapTupleIsValid(rftuple) &&
+                               !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL))
+                       {
+                               HeapTuple       tuple;
+
+                               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+                               if (HeapTupleIsValid(tuple))
+                               {
+                                       Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple);
+
+                                       if (relform->relkind == RELKIND_PARTITIONED_TABLE)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                                errmsg("cannot set %s for publication \"%s\"",
+                                                                               "publish_via_partition_root = false",
+                                                                               stmt->pubname),
+                                                                errdetail("The publication contains a WHERE clause for a partitioned table \"%s\" "
+                                                                                  "which is not allowed when %s is false.",
+                                                                                  NameStr(relform->relname),
+                                                                                  "publish_via_partition_root")));
+
+                                       ReleaseSysCache(tuple);
+                               }
+
+                               ReleaseSysCache(rftuple);
+                       }
+               }
+       }
+
        /* Everything ok, form a new tuple. */
        memset(values, 0, sizeof(values));
        memset(nulls, false, sizeof(nulls));
@@ -450,8 +889,21 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
                 * invalidate all partitions contained in the respective partition
                 * trees, not just those explicitly mentioned in the publication.
                 */
-               relids = GetPublicationRelations(pubform->oid,
-                                                                                PUBLICATION_PART_ALL);
+               if (root_relids == NIL)
+                       relids = GetPublicationRelations(pubform->oid,
+                                                                                        PUBLICATION_PART_ALL);
+               else
+               {
+                       /*
+                        * We already got tables explicitly mentioned in the publication.
+                        * Now get all partitions for the partitioned table in the list.
+                        */
+                       foreach(lc, root_relids)
+                               relids = GetPubPartitionOptionRelations(relids,
+                                                                                                               PUBLICATION_PART_ALL,
+                                                                                                               lfirst_oid(lc));
+               }
+
                schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
                                                                                                                PUBLICATION_PART_ALL);
                relids = list_concat_unique_oid(relids, schemarelids);
@@ -492,7 +944,8 @@ InvalidatePublicationRels(List *relids)
  */
 static void
 AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
-                                          List *tables, List *schemaidlist)
+                                          List *tables, List *schemaidlist,
+                                          const char *queryString)
 {
        List       *rels = NIL;
        Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
@@ -519,6 +972,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
                schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
                CheckObjSchemaNotAlreadyInPublication(rels, schemas,
                                                                                          PUBLICATIONOBJ_TABLE);
+
+               TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+
                PublicationAddTables(pubid, rels, false, stmt);
        }
        else if (stmt->action == AP_DropObjects)
@@ -533,37 +989,76 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
                CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
                                                                                          PUBLICATIONOBJ_TABLE);
 
-               /* Calculate which relations to drop. */
+               TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+
+               /*
+                * To recreate the relation list for the publication, look for
+                * existing relations that do not need to be dropped.
+                */
                foreach(oldlc, oldrelids)
                {
                        Oid                     oldrelid = lfirst_oid(oldlc);
                        ListCell   *newlc;
+                       PublicationRelInfo *oldrel;
                        bool            found = false;
+                       HeapTuple       rftuple;
+                       bool            rfisnull = true;
+                       Node       *oldrelwhereclause = NULL;
+
+                       /* look up the cache for the old relmap */
+                       rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+                                                                         ObjectIdGetDatum(oldrelid),
+                                                                         ObjectIdGetDatum(pubid));
+
+                       if (HeapTupleIsValid(rftuple))
+                       {
+                               Datum           whereClauseDatum;
+
+                               whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+                                                                                                  Anum_pg_publication_rel_prqual,
+                                                                                                  &rfisnull);
+                               if (!rfisnull)
+                                       oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
+
+                               ReleaseSysCache(rftuple);
+                       }
 
                        foreach(newlc, rels)
                        {
                                PublicationRelInfo *newpubrel;
 
                                newpubrel = (PublicationRelInfo *) lfirst(newlc);
+
+                               /*
+                                * Check if any of the new set of relations matches with the
+                                * existing relations in the publication. Additionally, if the
+                                * relation has an associated WHERE clause, check the WHERE
+                                * expressions also match. Drop the rest.
+                                */
                                if (RelationGetRelid(newpubrel->relation) == oldrelid)
                                {
-                                       found = true;
-                                       break;
+                                       if (equal(oldrelwhereclause, newpubrel->whereClause))
+                                       {
+                                               found = true;
+                                               break;
+                                       }
                                }
                        }
-                       /* Not yet in the list, open it and add to the list */
-                       if (!found)
-                       {
-                               Relation        oldrel;
-                               PublicationRelInfo *pubrel;
 
-                               /* Wrap relation into PublicationRelInfo */
-                               oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
+                       if (oldrelwhereclause)
+                               pfree(oldrelwhereclause);
 
-                               pubrel = palloc(sizeof(PublicationRelInfo));
-                               pubrel->relation = oldrel;
-
-                               delrels = lappend(delrels, pubrel);
+                       /*
+                        * Add the non-matched relations to a list so that they can be
+                        * dropped.
+                        */
+                       if (!found)
+                       {
+                               oldrel = palloc(sizeof(PublicationRelInfo));
+                               oldrel->whereClause = NULL;
+                               oldrel->relation = table_open(oldrelid,
+                                                                                         ShareUpdateExclusiveLock);
+                               delrels = lappend(delrels, oldrel);
                        }
                }
 
@@ -720,12 +1215,15 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
        {
                List       *relations = NIL;
                List       *schemaidlist = NIL;
+               Oid                     pubid = pubform->oid;
 
                ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
                                                                   &schemaidlist);
 
                CheckAlterPublication(stmt, tup, relations, schemaidlist);
 
+               heap_freetuple(tup);
+
                /*
                 * Lock the publication so nobody else can do anything with it. This
                 * prevents concurrent alter to add table(s) that were already going
@@ -734,22 +1232,24 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
                 * addition of schema(s) for which there is any corresponding table
                 * being added by this command.
                 */
-               LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+               LockDatabaseObject(PublicationRelationId, pubid, 0,
                                                   AccessExclusiveLock);
 
                /*
                 * It is possible that by the time we acquire the lock on publication,
                 * concurrent DDL has removed it. We can test this by checking the
-                * existence of publication.
+                * existence of publication. We get the tuple again to avoid the risk
+                * of any publication option getting changed.
                 */
-               if (!SearchSysCacheExists1(PUBLICATIONOID,
-                                                                  ObjectIdGetDatum(pubform->oid)))
+               tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+               if (!HeapTupleIsValid(tup))
                        ereport(ERROR,
                                        errcode(ERRCODE_UNDEFINED_OBJECT),
                                        errmsg("publication \"%s\" does not exist",
                                                   stmt->pubname));
 
-               AlterPublicationTables(stmt, tup, relations, schemaidlist);
+               AlterPublicationTables(stmt, tup, relations, schemaidlist,
+                                                          pstate->p_sourcetext);
                AlterPublicationSchemas(stmt, tup, schemaidlist);
        }
 
@@ -901,6 +1401,7 @@ OpenTableList(List *tables)
        List       *relids = NIL;
        List       *rels = NIL;
        ListCell   *lc;
+       List       *relids_with_rf = NIL;
 
        /*
         * Open, share-lock, and check all the explicitly-specified relations
@@ -928,15 +1429,26 @@ OpenTableList(List *tables)
                 */
                if (list_member_oid(relids, myrelid))
                {
+                       /* Disallow duplicate tables if there are any with row filters. */
+                       if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                                errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
+                                                               RelationGetRelationName(rel))));
+
                        table_close(rel, ShareUpdateExclusiveLock);
                        continue;
                }
 
                pub_rel = palloc(sizeof(PublicationRelInfo));
                pub_rel->relation = rel;
+               pub_rel->whereClause = t->whereClause;
                rels = lappend(rels, pub_rel);
                relids = lappend_oid(relids, myrelid);
 
+               if (t->whereClause)
+                       relids_with_rf = lappend_oid(relids_with_rf, myrelid);
+
                /*
                 * Add children of this rel, if requested, so that they too are added
                 * to the publication.  A partitioned table can't have any inheritance
@@ -963,19 +1475,39 @@ OpenTableList(List *tables)
                                 * tables.
                                 */
                                if (list_member_oid(relids, childrelid))
+                               {
+                                       /*
+                                        * We don't allow to specify row filter for both parent
+                                        * and child table at the same time as it is not very
+                                        * clear which one should be given preference.
+                                        */
+                                       if (childrelid != myrelid &&
+                                               (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                                                errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
+                                                                               RelationGetRelationName(rel))));
+
                                        continue;
+                               }
 
                                /* find_all_inheritors already got lock */
                                rel = table_open(childrelid, NoLock);
                                pub_rel = palloc(sizeof(PublicationRelInfo));
                                pub_rel->relation = rel;
+                               /* child inherits WHERE clause from parent */
+                               pub_rel->whereClause = t->whereClause;
                                rels = lappend(rels, pub_rel);
                                relids = lappend_oid(relids, childrelid);
+
+                               if (t->whereClause)
+                                       relids_with_rf = lappend_oid(relids_with_rf, childrelid);
                        }
                }
        }
 
        list_free(relids);
+       list_free(relids_with_rf);
 
        return rels;
 }
@@ -995,6 +1527,8 @@ CloseTableList(List *rels)
                pub_rel = (PublicationRelInfo *) lfirst(lc);
                table_close(pub_rel->relation, NoLock);
        }
+
+       list_free_deep(rels);
 }
 
 /*
@@ -1090,6 +1624,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
                                                        RelationGetRelationName(rel))));
                }
 
+               if (pubrel->whereClause)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("cannot use a WHERE clause when removing a table from a publication")));
+
                ObjectAddressSet(obj, PublicationRelRelationId, prid);
                performDeletion(&obj, DROP_CASCADE, 0);
        }
index 313c87398b2397478438abc9ead8a768f901e658..de106d767d1ec570a0625bff12043fcd2ed4e88d 100644 (file)
@@ -567,15 +567,43 @@ ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
 void
 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 {
-       PublicationActions *pubactions;
+       PublicationDesc pubdesc;
 
        /* We only need to do checks for UPDATE and DELETE. */
        if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
                return;
 
+       if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+               return;
+
+       /*
+        * It is only safe to execute UPDATE/DELETE when all columns, referenced
+        * in the row filters from publications which the relation is in, are
+        * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
+        * or the table does not publish UPDATEs or DELETEs.
+        *
+        * XXX We could optimize it by first checking whether any of the
+        * publications have a row filter for this relation. If not and relation
+        * has replica identity then we can avoid building the descriptor but as
+        * this happens only one time it doesn't seem worth the additional
+        * complexity.
+        */
+       RelationBuildPublicationDesc(rel, &pubdesc);
+       if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                                errmsg("cannot update table \"%s\"",
+                                               RelationGetRelationName(rel)),
+                                errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+       else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
+               ereport(ERROR,
+                               (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+                                errmsg("cannot delete from table \"%s\"",
+                                               RelationGetRelationName(rel)),
+                                errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+
        /* If relation has replica identity we are always good. */
-       if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-               OidIsValid(RelationGetReplicaIndex(rel)))
+       if (OidIsValid(RelationGetReplicaIndex(rel)))
                return;
 
        /*
@@ -583,14 +611,13 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
         *
         * Check if the table publishes UPDATES or DELETES.
         */
-       pubactions = GetRelationPublicationActions(rel);
-       if (cmd == CMD_UPDATE && pubactions->pubupdate)
+       if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
                                                RelationGetRelationName(rel)),
                                 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
-       else if (cmd == CMD_DELETE && pubactions->pubdelete)
+       else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
index bc0d90b4b1b8c9eeee40b5656b8bd1fec38afedd..d4f8455a2bdb048814557d363ac64c5bd7e25491 100644 (file)
@@ -4849,6 +4849,7 @@ _copyPublicationTable(const PublicationTable *from)
        PublicationTable *newnode = makeNode(PublicationTable);
 
        COPY_NODE_FIELD(relation);
+       COPY_NODE_FIELD(whereClause);
 
        return newnode;
 }
index 2e7122ad2f629197d3ce74001d77b389f70690cf..f1002afe7a0db90d653607ad812c82c384622003 100644 (file)
@@ -2321,6 +2321,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
        COMPARE_NODE_FIELD(relation);
+       COMPARE_NODE_FIELD(whereClause);
 
        return true;
 }
index 92f93cfc72d900701d58b3dfb0d1efbfe9317c98..a03b33b53bdcefe5bd7a2d5c92888e7b95450fd5 100644 (file)
@@ -9751,12 +9751,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-                       TABLE relation_expr
+                       TABLE relation_expr OptWhereClause
                                {
                                        $$ = makeNode(PublicationObjSpec);
                                        $$->pubobjtype = PUBLICATIONOBJ_TABLE;
                                        $$->pubtable = makeNode(PublicationTable);
                                        $$->pubtable->relation = $2;
+                                       $$->pubtable->whereClause = $3;
                                }
                        | ALL TABLES IN_P SCHEMA ColId
                                {
@@ -9771,28 +9772,45 @@ PublicationObjSpec:
                                        $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
                                        $$->location = @5;
                                }
-                       | ColId
+                       | ColId OptWhereClause
                                {
                                        $$ = makeNode(PublicationObjSpec);
                                        $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-                                       $$->name = $1;
+                                       if ($2)
+                                       {
+                                               /*
+                                                * The OptWhereClause must be stored here but it is
+                                                * valid only for tables. For non-table objects, an
+                                                * error will be thrown later via
+                                                * preprocess_pubobj_list().
+                                                */
+                                               $$->pubtable = makeNode(PublicationTable);
+                                               $$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+                                               $$->pubtable->whereClause = $2;
+                                       }
+                                       else
+                                       {
+                                               $$->name = $1;
+                                       }
                                        $$->location = @1;
                                }
-                       | ColId indirection
+                       | ColId indirection OptWhereClause
                                {
                                        $$ = makeNode(PublicationObjSpec);
                                        $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
                                        $$->pubtable = makeNode(PublicationTable);
                                        $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+                                       $$->pubtable->whereClause = $3;
                                        $$->location = @1;
                                }
                        /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-                       | extended_relation_expr
+                       | extended_relation_expr OptWhereClause
                                {
                                        $$ = makeNode(PublicationObjSpec);
                                        $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
                                        $$->pubtable = makeNode(PublicationTable);
                                        $$->pubtable->relation = $1;
+                                       $$->pubtable->whereClause = $2;
                                }
                        | CURRENT_SCHEMA
                                {
@@ -17448,7 +17466,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
                                                errcode(ERRCODE_SYNTAX_ERROR),
                                                errmsg("invalid table name at or near"),
                                                parser_errposition(pubobj->location));
-                       else if (pubobj->name)
+
+                       if (pubobj->name)
                        {
                                /* convert it to PublicationTable */
                                PublicationTable *pubtable = makeNode(PublicationTable);
@@ -17462,6 +17481,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
                else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA ||
                                 pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
                {
+                       /* WHERE clause is not allowed on a schema object */
+                       if (pubobj->pubtable && pubobj->pubtable->whereClause)
+                               ereport(ERROR,
+                                               errcode(ERRCODE_SYNTAX_ERROR),
+                                               errmsg("WHERE clause not allowed for schema"),
+                                               parser_errposition(pubobj->location));
+
                        /*
                         * We can distinguish between the different type of schema
                         * objects based on whether name and pubtable is set.
index 953942692cef871440d058a4f39c670b53c1cea4..c9b0eeefd7ee9c2b9ed9800a7b3226acfce57439 100644 (file)
@@ -31,8 +31,8 @@
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-                                                                  HeapTuple tuple, bool binary);
-
+                                                                  TupleTableSlot *slot,
+                                                                  bool binary);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-                                               HeapTuple newtuple, bool binary)
+                                               TupleTableSlot *newslot, bool binary)
 {
        pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
        pq_sendint32(out, RelationGetRelid(rel));
 
        pq_sendbyte(out, 'N');          /* new tuple follows */
-       logicalrep_write_tuple(out, rel, newtuple, binary);
+       logicalrep_write_tuple(out, rel, newslot, binary);
 }
 
 /*
@@ -442,7 +442,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-                                               HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+                                               TupleTableSlot *oldslot, TupleTableSlot *newslot,
+                                               bool binary)
 {
        pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -457,17 +458,17 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
        /* use Oid as relation identifier */
        pq_sendint32(out, RelationGetRelid(rel));
 
-       if (oldtuple != NULL)
+       if (oldslot != NULL)
        {
                if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
                        pq_sendbyte(out, 'O');  /* old tuple follows */
                else
                        pq_sendbyte(out, 'K');  /* old key follows */
-               logicalrep_write_tuple(out, rel, oldtuple, binary);
+               logicalrep_write_tuple(out, rel, oldslot, binary);
        }
 
        pq_sendbyte(out, 'N');          /* new tuple follows */
-       logicalrep_write_tuple(out, rel, newtuple, binary);
+       logicalrep_write_tuple(out, rel, newslot, binary);
 }
 
 /*
@@ -516,7 +517,7 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  */
 void
 logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
-                                               HeapTuple oldtuple, bool binary)
+                                               TupleTableSlot *oldslot, bool binary)
 {
        Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
                   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -536,7 +537,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
        else
                pq_sendbyte(out, 'K');  /* old key follows */
 
-       logicalrep_write_tuple(out, rel, oldtuple, binary);
+       logicalrep_write_tuple(out, rel, oldslot, binary);
 }
 
 /*
@@ -749,11 +750,12 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
+                                          bool binary)
 {
        TupleDesc       desc;
-       Datum           values[MaxTupleAttributeNumber];
-       bool            isnull[MaxTupleAttributeNumber];
+       Datum      *values;
+       bool       *isnull;
        int                     i;
        uint16          nliveatts = 0;
 
@@ -767,11 +769,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
        }
        pq_sendint16(out, nliveatts);
 
-       /* try to allocate enough memory from the get-go */
-       enlargeStringInfo(out, tuple->t_len +
-                                         nliveatts * (1 + 4));
-
-       heap_deform_tuple(tuple, desc, values, isnull);
+       slot_getallattrs(slot);
+       values = slot->tts_values;
+       isnull = slot->tts_isnull;
 
        /* Write the values */
        for (i = 0; i < desc->natts; i++)
index e596b69d466546cedac0b7c481ef43ecb6026978..1659964571c0f0c141cf2dab61701ce4e99421a0 100644 (file)
@@ -690,19 +690,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 /*
  * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in the COPY command.
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-                                               LogicalRepRelation *lrel)
+                                               LogicalRepRelation *lrel, List **qual)
 {
        WalRcvExecResult *res;
        StringInfoData cmd;
        TupleTableSlot *slot;
        Oid                     tableRow[] = {OIDOID, CHAROID, CHAROID};
        Oid                     attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+       Oid                     qualRow[] = {TEXTOID};
        bool            isnull;
        int                     natt;
+       ListCell   *lc;
+       bool            first;
 
        lrel->nspname = nspname;
        lrel->relname = relname;
@@ -798,6 +802,98 @@ fetch_remote_table_info(char *nspname, char *relname,
        lrel->natts = natt;
 
        walrcv_clear_result(res);
+
+       /*
+        * Get relation's row filter expressions. DISTINCT avoids the same
+        * expression of a table in multiple publications from being included
+        * multiple times in the final expression.
+        *
+        * We need to copy the row even if it matches just one of the
+        * publications, so we later combine all the quals with OR.
+        *
+        * For initial synchronization, row filtering can be ignored in following
+        * cases:
+        *
+        * 1) one of the subscribed publications for the table hasn't specified
+        * any row filter
+        *
+        * 2) one of the subscribed publications has puballtables set to true
+        *
+        * 3) one of the subscribed publications is declared as ALL TABLES IN
+        * SCHEMA that includes this relation
+        */
+       if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+       {
+               StringInfoData pub_names;
+
+               /* Build the pubname list. */
+               initStringInfo(&pub_names);
+               first = true;
+               foreach(lc, MySubscription->publications)
+               {
+                       char       *pubname = strVal(lfirst(lc));
+
+                       if (first)
+                               first = false;
+                       else
+                               appendStringInfoString(&pub_names, ", ");
+
+                       appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+               }
+
+               /* Check for row filters. */
+               resetStringInfo(&cmd);
+               appendStringInfo(&cmd,
+                                                "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+                                                "  FROM pg_publication p"
+                                                "  LEFT OUTER JOIN pg_publication_rel pr"
+                                                "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+                                                "  LATERAL pg_get_publication_tables(p.pubname) gpt"
+                                                " WHERE gpt.relid = %u"
+                                                "   AND p.pubname IN ( %s )",
+                                                lrel->remoteid,
+                                                lrel->remoteid,
+                                                pub_names.data);
+
+               res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+               if (res->status != WALRCV_OK_TUPLES)
+                       ereport(ERROR,
+                                       (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
+                                                       nspname, relname, res->err)));
+
+               /*
+                * Multiple row filter expressions for the same table will be combined
+                * by COPY using OR. If any of the filter expressions for this table
+                * are null, it means the whole table will be copied. In this case it
+                * is not necessary to construct a unified row filter expression at
+                * all.
+                */
+               slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+               while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+               {
+                       Datum           rf = slot_getattr(slot, 1, &isnull);
+
+                       if (!isnull)
+                               *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+                       else
+                       {
+                               /* Ignore filters and cleanup as necessary. */
+                               if (*qual)
+                               {
+                                       list_free_deep(*qual);
+                                       *qual = NIL;
+                               }
+                               break;
+                       }
+
+                       ExecClearTuple(slot);
+               }
+               ExecDropSingleTupleTableSlot(slot);
+
+               walrcv_clear_result(res);
+       }
+
        pfree(cmd.data);
 }
 
@@ -811,6 +907,7 @@ copy_table(Relation rel)
 {
        LogicalRepRelMapEntry *relmapentry;
        LogicalRepRelation lrel;
+       List       *qual = NIL;
        WalRcvExecResult *res;
        StringInfoData cmd;
        CopyFromState cstate;
@@ -819,7 +916,7 @@ copy_table(Relation rel)
 
        /* Get the publisher relation info. */
        fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-                                                       RelationGetRelationName(rel), &lrel);
+                                                       RelationGetRelationName(rel), &lrel, &qual);
 
        /* Put the relation into relmap. */
        logicalrep_relmap_update(&lrel);
@@ -830,14 +927,18 @@ copy_table(Relation rel)
 
        /* Start copy on the publisher. */
        initStringInfo(&cmd);
-       if (lrel.relkind == RELKIND_RELATION)
+
+       /* Regular table with no row filter */
+       if (lrel.relkind == RELKIND_RELATION && qual == NIL)
                appendStringInfo(&cmd, "COPY %s TO STDOUT",
                                                 quote_qualified_identifier(lrel.nspname, lrel.relname));
        else
        {
                /*
-                * For non-tables, we need to do COPY (SELECT ...), but we can't just
-                * do SELECT * because we need to not copy generated columns.
+                * For non-tables and tables with row filters, we need to do COPY
+                * (SELECT ...), but we can't just do SELECT * because we need to not
+                * copy generated columns. For tables with any row filters, build a
+                * SELECT query with OR'ed row filters for COPY.
                 */
                appendStringInfoString(&cmd, "COPY (SELECT ");
                for (int i = 0; i < lrel.natts; i++)
@@ -846,8 +947,33 @@ copy_table(Relation rel)
                        if (i < lrel.natts - 1)
                                appendStringInfoString(&cmd, ", ");
                }
-               appendStringInfo(&cmd, " FROM %s) TO STDOUT",
-                                                quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+               appendStringInfoString(&cmd, " FROM ");
+
+               /*
+                * For regular tables, make sure we don't copy data from a child that
+                * inherits the named table as those will be copied separately.
+                */
+               if (lrel.relkind == RELKIND_RELATION)
+                       appendStringInfoString(&cmd, "ONLY ");
+
+               appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+               /* list of OR'ed filters */
+               if (qual != NIL)
+               {
+                       ListCell   *lc;
+                       char       *q = strVal(linitial(qual));
+
+                       appendStringInfo(&cmd, " WHERE %s", q);
+                       for_each_from(lc, qual, 1)
+                       {
+                               q = strVal(lfirst(lc));
+                               appendStringInfo(&cmd, " OR %s", q);
+                       }
+                       list_free_deep(qual);
+               }
+
+               appendStringInfoString(&cmd, ") TO STDOUT");
        }
        res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
        pfree(cmd.data);
index 4162bb8de7b6aa045fcee7ccd0723e64628b5bb6..ea57a0477f057c694834e0de79cef984926539b5 100644 (file)
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
 #include "commands/defrem.h"
+#include "executor/executor.h"
 #include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -85,6 +90,19 @@ static void send_repl_origin(LogicalDecodingContext *ctx,
                                                         RepOriginId origin_id, XLogRecPtr origin_lsn,
                                                         bool send_origin);
 
+/*
+ * Only 3 publication actions are used for row filtering ("insert", "update",
+ * "delete"). See RelationSyncEntry.exprstate[].
+ */
+enum RowFilterPubAction
+{
+       PUBACTION_INSERT,
+       PUBACTION_UPDATE,
+       PUBACTION_DELETE
+};
+
+#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
+
 /*
  * Entry in the map used to remember which relation schemas we sent.
  *
@@ -116,6 +134,21 @@ typedef struct RelationSyncEntry
        /* are we publishing this rel? */
        PublicationActions pubactions;
 
+       /*
+        * ExprState array for row filter. Different publication actions don't
+        * allow multiple expressions to always be combined into one, because
+        * updates or deletes restrict the column in expression to be part of the
+        * replica identity index whereas inserts do not have this restriction, so
+        * there is one ExprState per publication action.
+        */
+       ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
+       EState     *estate;                     /* executor state used for row filter */
+       MemoryContext cache_expr_cxt;   /* private context for exprstate and
+                                                                        * estate, if any */
+
+       TupleTableSlot *new_slot;       /* slot for storing new tuple */
+       TupleTableSlot *old_slot;       /* slot for storing old tuple */
+
        /*
         * OID of the relation to publish changes as.  For a partition, this may
         * be set to one of its ancestors whose schema will be used when
@@ -130,7 +163,7 @@ typedef struct RelationSyncEntry
         * same as 'relid' or if unnecessary due to partition and the ancestor
         * having identical TupleDesc.
         */
-       TupleConversionMap *map;
+       AttrMap    *attrmap;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -138,7 +171,8 @@ static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
+                                                                                        Relation relation);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
                                                                                  uint32 hashvalue);
@@ -146,6 +180,20 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
                                                                                        TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
                                                                                        TransactionId xid);
+static void init_tuple_slot(PGOutputData *data, Relation relation,
+                                                       RelationSyncEntry *entry);
+
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static void pgoutput_row_filter_init(PGOutputData *data,
+                                                                        List *publications,
+                                                                        RelationSyncEntry *entry);
+static bool pgoutput_row_filter_exec_expr(ExprState *state,
+                                                                                 ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+                                                               TupleTableSlot **new_slot_ptr,
+                                                               RelationSyncEntry *entry,
+                                                               ReorderBufferChangeType *action);
 
 /*
  * Specify output plugin callbacks
@@ -303,6 +351,10 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                                                                  "logical replication output context",
                                                                                  ALLOCSET_DEFAULT_SIZES);
 
+       data->cachectx = AllocSetContextCreate(ctx->context,
+                                                                                  "logical replication cache context",
+                                                                                  ALLOCSET_DEFAULT_SIZES);
+
        ctx->output_plugin_private = data;
 
        /* This plugin uses binary protocol. */
@@ -543,37 +595,14 @@ maybe_send_schema(LogicalDecodingContext *ctx,
                return;
 
        /*
-        * Nope, so send the schema.  If the changes will be published using an
-        * ancestor's schema, not the relation's own, send that ancestor's schema
-        * before sending relation's own (XXX - maybe sending only the former
-        * suffices?).  This is also a good place to set the map that will be used
-        * to convert the relation's tuples into the ancestor's format, if needed.
+        * Send the schema.  If the changes will be published using an ancestor's
+        * schema, not the relation's own, send that ancestor's schema before
+        * sending relation's own (XXX - maybe sending only the former suffices?).
         */
        if (relentry->publish_as_relid != RelationGetRelid(relation))
        {
                Relation        ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-               TupleDesc       indesc = RelationGetDescr(relation);
-               TupleDesc       outdesc = RelationGetDescr(ancestor);
-               MemoryContext oldctx;
-
-               /* Map must live as long as the session does. */
-               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
 
-               /*
-                * Make copies of the TupleDescs that will live as long as the map
-                * does before putting into the map.
-                */
-               indesc = CreateTupleDescCopy(indesc);
-               outdesc = CreateTupleDescCopy(outdesc);
-               relentry->map = convert_tuples_by_name(indesc, outdesc);
-               if (relentry->map == NULL)
-               {
-                       /* Map not necessary, so free the TupleDescs too. */
-                       FreeTupleDesc(indesc);
-                       FreeTupleDesc(outdesc);
-               }
-
-               MemoryContextSwitchTo(oldctx);
                send_relation_and_attrs(ancestor, xid, ctx);
                RelationClose(ancestor);
        }
@@ -624,6 +653,484 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
        OutputPluginWrite(ctx, false);
 }
 
+/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+       EState     *estate;
+       RangeTblEntry *rte;
+
+       estate = CreateExecutorState();
+
+       rte = makeNode(RangeTblEntry);
+       rte->rtekind = RTE_RELATION;
+       rte->relid = RelationGetRelid(rel);
+       rte->relkind = rel->rd_rel->relkind;
+       rte->rellockmode = AccessShareLock;
+       ExecInitRangeTable(estate, list_make1(rte));
+
+       estate->es_output_cid = GetCurrentCommandId(false);
+
+       return estate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+       Datum           ret;
+       bool            isnull;
+
+       Assert(state != NULL);
+
+       ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+       elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+                isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
+                isnull ? "true" : "false");
+
+       if (isnull)
+               return false;
+
+       return DatumGetBool(ret);
+}
+
+/*
+ * Initialize the row filter.
+ */
+static void
+pgoutput_row_filter_init(PGOutputData *data, List *publications,
+                                                RelationSyncEntry *entry)
+{
+       ListCell   *lc;
+       List       *rfnodes[] = {NIL, NIL, NIL};        /* One per pubaction */
+       bool            no_filter[] = {false, false, false};    /* One per pubaction */
+       MemoryContext oldctx;
+       int                     idx;
+       bool            has_filter = true;
+
+       /*
+        * Find if there are any row filters for this relation. If there are, then
+        * prepare the necessary ExprState and cache it in entry->exprstate. To
+        * build an expression state, we need to ensure the following:
+        *
+        * All the given publication-table mappings must be checked.
+        *
+        * Multiple publications might have multiple row filters for this
+        * relation. Since row filter usage depends on the DML operation, there
+        * are multiple lists (one for each operation) to which row filters will
+        * be appended.
+        *
+        * FOR ALL TABLES implies "don't use row filter expression" so it takes
+        * precedence.
+        */
+       foreach(lc, publications)
+       {
+               Publication *pub = lfirst(lc);
+               HeapTuple       rftuple = NULL;
+               Datum           rfdatum = 0;
+               bool            pub_no_filter = false;
+
+               if (pub->alltables)
+               {
+                       /*
+                        * If the publication is FOR ALL TABLES then it is treated the
+                        * same as if this table has no row filters (even if for other
+                        * publications it does).
+                        */
+                       pub_no_filter = true;
+               }
+               else
+               {
+                       /*
+                        * Check for the presence of a row filter in this publication.
+                        */
+                       rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+                                                                         ObjectIdGetDatum(entry->publish_as_relid),
+                                                                         ObjectIdGetDatum(pub->oid));
+
+                       if (HeapTupleIsValid(rftuple))
+                       {
+                               /* Null indicates no filter. */
+                               rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+                                                                                 Anum_pg_publication_rel_prqual,
+                                                                                 &pub_no_filter);
+                       }
+                       else
+                       {
+                               pub_no_filter = true;
+                       }
+               }
+
+               if (pub_no_filter)
+               {
+                       if (rftuple)
+                               ReleaseSysCache(rftuple);
+
+                       no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
+                       no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
+                       no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
+
+                       /*
+                        * Quick exit if all the DML actions are publicized via this
+                        * publication.
+                        */
+                       if (no_filter[PUBACTION_INSERT] &&
+                               no_filter[PUBACTION_UPDATE] &&
+                               no_filter[PUBACTION_DELETE])
+                       {
+                               has_filter = false;
+                               break;
+                       }
+
+                       /* No additional work for this publication. Next one. */
+                       continue;
+               }
+
+               /* Form the per pubaction row filter lists. */
+               if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
+                       rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
+                                                                                               TextDatumGetCString(rfdatum));
+               if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
+                       rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
+                                                                                               TextDatumGetCString(rfdatum));
+               if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
+                       rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
+                                                                                               TextDatumGetCString(rfdatum));
+
+               ReleaseSysCache(rftuple);
+       }                                                       /* loop all subscribed publications */
+
+       /* Clean the row filter */
+       for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+       {
+               if (no_filter[idx])
+               {
+                       list_free_deep(rfnodes[idx]);
+                       rfnodes[idx] = NIL;
+               }
+       }
+
+       if (has_filter)
+       {
+               Relation        relation = RelationIdGetRelation(entry->publish_as_relid);
+
+               Assert(entry->cache_expr_cxt == NULL);
+
+               /* Create the memory context for row filters */
+               entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
+                                                                                                         "Row filter expressions",
+                                                                                                         ALLOCSET_DEFAULT_SIZES);
+
+               MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
+                                                                                 RelationGetRelationName(relation));
+
+               /*
+                * Now all the filters for all pubactions are known. Combine them when
+                * their pubactions are the same.
+                */
+               oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
+               entry->estate = create_estate_for_relation(relation);
+               for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+               {
+                       List       *filters = NIL;
+                       Expr       *rfnode;
+
+                       if (rfnodes[idx] == NIL)
+                               continue;
+
+                       foreach(lc, rfnodes[idx])
+                               filters = lappend(filters, stringToNode((char *) lfirst(lc)));
+
+                       /* combine the row filter and cache the ExprState */
+                       rfnode = make_orclause(filters);
+                       entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
+               }                                               /* for each pubaction */
+               MemoryContextSwitchTo(oldctx);
+
+               RelationClose(relation);
+       }
+}
+
+/*
+ * Initialize the slot for storing new and old tuples, and build the map that
+ * will be used to convert the relation's tuples into the ancestor's format.
+ */
+static void
+init_tuple_slot(PGOutputData *data, Relation relation,
+                               RelationSyncEntry *entry)
+{
+       MemoryContext oldctx;
+       TupleDesc       oldtupdesc;
+       TupleDesc       newtupdesc;
+
+       oldctx = MemoryContextSwitchTo(data->cachectx);
+
+       /*
+        * Create tuple table slots. Create a copy of the TupleDesc as it needs to
+        * live as long as the cache remains.
+        */
+       oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+       newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+
+       entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
+       entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
+
+       MemoryContextSwitchTo(oldctx);
+
+       /*
+        * Cache the map that will be used to convert the relation's tuples into
+        * the ancestor's format, if needed.
+        */
+       if (entry->publish_as_relid != RelationGetRelid(relation))
+       {
+               Relation        ancestor = RelationIdGetRelation(entry->publish_as_relid);
+               TupleDesc       indesc = RelationGetDescr(relation);
+               TupleDesc       outdesc = RelationGetDescr(ancestor);
+
+               /* Map must live as long as the session does. */
+               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+               entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
+
+               MemoryContextSwitchTo(oldctx);
+               RelationClose(ancestor);
+       }
+}
+
+/*
+ * Change is checked against the row filter if any.
+ *
+ * Returns true if the change is to be replicated, else false.
+ *
+ * For inserts, evaluate the row filter for new tuple.
+ * For deletes, evaluate the row filter for old tuple.
+ * For updates, evaluate the row filter for old and new tuple.
+ *
+ * For updates, if both evaluations are true, we allow sending the UPDATE and
+ * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
+ * only one of the tuples matches the row filter expression, we transform
+ * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
+ * following rules:
+ *
+ * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
+ * Case 2: old-row (no match)    new row (match)     -> INSERT
+ * Case 3: old-row (match)       new-row (no match)  -> DELETE
+ * Case 4: old-row (match)       new row (match)     -> UPDATE
+ *
+ * The new action is updated in the action parameter.
+ *
+ * The new slot could be updated when transforming the UPDATE into INSERT,
+ * because the original new tuple might not have column values from the replica
+ * identity.
+ *
+ * Examples:
+ * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
+ * Since the old tuple satisfies, the initial table synchronization copied this
+ * row (or another method was used to guarantee that there is data
+ * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
+ * row filter, so from a data consistency perspective, that row should be
+ * removed on the subscriber. The UPDATE should be transformed into a DELETE
+ * statement and be sent to the subscriber. Keeping this row on the subscriber
+ * is undesirable because it doesn't reflect what was defined in the row filter
+ * expression on the publisher. This row on the subscriber would likely not be
+ * modified by replication again. If someone inserted a new row with the same
+ * old identifier, replication could stop due to a constraint violation.
+ *
+ * Let's say the old tuple doesn't match the row filter but the new tuple does.
+ * Since the old tuple doesn't satisfy, the initial table synchronization
+ * probably didn't copy this row. However, after the UPDATE the new tuple does
+ * satisfy the row filter, so from a data consistency perspective, that row
+ * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
+ * statements have no effect (it matches no row -- see
+ * apply_handle_update_internal()). So, the UPDATE should be transformed into a
+ * INSERT statement and be sent to the subscriber. However, this might surprise
+ * someone who expects the data set to satisfy the row filter expression on the
+ * provider.
+ */
+static bool
+pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+                                       TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
+                                       ReorderBufferChangeType *action)
+{
+       TupleDesc       desc;
+       int                     i;
+       bool            old_matched,
+                               new_matched,
+                               result;
+       TupleTableSlot *tmp_new_slot;
+       TupleTableSlot *new_slot = *new_slot_ptr;
+       ExprContext *ecxt;
+       ExprState  *filter_exprstate;
+
+       /*
+        * We need this map to avoid relying on ReorderBufferChangeType enums
+        * having specific values.
+        */
+       static const int map_changetype_pubaction[] = {
+               [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
+               [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
+               [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
+       };
+
+       Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
+                  *action == REORDER_BUFFER_CHANGE_UPDATE ||
+                  *action == REORDER_BUFFER_CHANGE_DELETE);
+
+       Assert(new_slot || old_slot);
+
+       /* Get the corresponding row filter */
+       filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+
+       /* Bail out if there is no row filter */
+       if (!filter_exprstate)
+               return true;
+
+       elog(DEBUG3, "table \"%s.%s\" has row filter",
+                get_namespace_name(RelationGetNamespace(relation)),
+                RelationGetRelationName(relation));
+
+       ResetPerTupleExprContext(entry->estate);
+
+       ecxt = GetPerTupleExprContext(entry->estate);
+
+       /*
+        * For the following occasions where there is only one tuple, we can
+        * evaluate the row filter for that tuple and return.
+        *
+        * For inserts, we only have the new tuple.
+        *
+        * For updates, we can have only a new tuple when none of the replica
+        * identity columns changed but we still need to evaluate the row filter
+        * for new tuple as the existing values of those columns might not match
+        * the filter. Also, users can use constant expressions in the row filter,
+        * so we anyway need to evaluate it for the new tuple.
+        *
+        * For deletes, we only have the old tuple.
+        */
+       if (!new_slot || !old_slot)
+       {
+               ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+               result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+               return result;
+       }
+
+       /*
+        * Both the old and new tuples must be valid only for updates and need to
+        * be checked against the row filter.
+        */
+       Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+       slot_getallattrs(new_slot);
+       slot_getallattrs(old_slot);
+
+       tmp_new_slot = NULL;
+       desc = RelationGetDescr(relation);
+
+       /*
+        * The new tuple might not have all the replica identity columns, in which
+        * case it needs to be copied over from the old tuple.
+        */
+       for (i = 0; i < desc->natts; i++)
+       {
+               Form_pg_attribute att = TupleDescAttr(desc, i);
+
+               /*
+                * if the column in the new tuple or old tuple is null, nothing to do
+                */
+               if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+                       continue;
+
+               /*
+                * Unchanged toasted replica identity columns are only logged in the
+                * old tuple. Copy this over to the new tuple. The changed (or WAL
+                * Logged) toast values are always assembled in memory and set as
+                * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+                */
+               if (att->attlen == -1 &&
+                       VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+                       !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+               {
+                       if (!tmp_new_slot)
+                       {
+                               tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+                               ExecClearTuple(tmp_new_slot);
+
+                               memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+                                          desc->natts * sizeof(Datum));
+                               memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+                                          desc->natts * sizeof(bool));
+                       }
+
+                       tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+                       tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+               }
+       }
+
+       ecxt->ecxt_scantuple = old_slot;
+       old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+       if (tmp_new_slot)
+       {
+               ExecStoreVirtualTuple(tmp_new_slot);
+               ecxt->ecxt_scantuple = tmp_new_slot;
+       }
+       else
+               ecxt->ecxt_scantuple = new_slot;
+
+       new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+       /*
+        * Case 1: if both tuples don't match the row filter, bailout. Send
+        * nothing.
+        */
+       if (!old_matched && !new_matched)
+               return false;
+
+       /*
+        * Case 2: if the old tuple doesn't satisfy the row filter but the new
+        * tuple does, transform the UPDATE into INSERT.
+        *
+        * Use the newly transformed tuple that must contain the column values for
+        * all the replica identity columns. This is required to ensure that the
+        * while inserting the tuple in the downstream node, we have all the
+        * required column values.
+        */
+       if (!old_matched && new_matched)
+       {
+               *action = REORDER_BUFFER_CHANGE_INSERT;
+
+               if (tmp_new_slot)
+                       *new_slot_ptr = tmp_new_slot;
+       }
+
+       /*
+        * Case 3: if the old tuple satisfies the row filter but the new tuple
+        * doesn't, transform the UPDATE into DELETE.
+        *
+        * This transformation does not require another tuple. The Old tuple will
+        * be used for DELETE.
+        */
+       else if (old_matched && !new_matched)
+               *action = REORDER_BUFFER_CHANGE_DELETE;
+
+       /*
+        * Case 4: if both tuples match the row filter, transformation isn't
+        * required. (*action is default UPDATE).
+        */
+
+       return true;
+}
+
 /*
  * Sends the decoded DML over wire.
  *
@@ -638,6 +1145,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        RelationSyncEntry *relentry;
        TransactionId xid = InvalidTransactionId;
        Relation        ancestor = NULL;
+       Relation        targetrel = relation;
+       ReorderBufferChangeType action = change->action;
+       TupleTableSlot *old_slot = NULL;
+       TupleTableSlot *new_slot = NULL;
 
        if (!is_publishable_relation(relation))
                return;
@@ -651,10 +1162,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        if (in_streaming)
                xid = change->txn->xid;
 
-       relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+       relentry = get_rel_sync_entry(data, relation);
 
        /* First check the table filter */
-       switch (change->action)
+       switch (action)
        {
                case REORDER_BUFFER_CHANGE_INSERT:
                        if (!relentry->pubactions.pubinsert)
@@ -675,80 +1186,149 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        /* Avoid leaking memory by using and resetting our own context */
        old = MemoryContextSwitchTo(data->context);
 
-       maybe_send_schema(ctx, change, relation, relentry);
-
        /* Send the data */
-       switch (change->action)
+       switch (action)
        {
                case REORDER_BUFFER_CHANGE_INSERT:
-                       {
-                               HeapTuple       tuple = &change->data.tp.newtuple->tuple;
+                       new_slot = relentry->new_slot;
+                       ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+                                                          new_slot, false);
 
-                               /* Switch relation if publishing via root. */
-                               if (relentry->publish_as_relid != RelationGetRelid(relation))
+                       /* Switch relation if publishing via root. */
+                       if (relentry->publish_as_relid != RelationGetRelid(relation))
+                       {
+                               Assert(relation->rd_rel->relispartition);
+                               ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+                               targetrel = ancestor;
+                               /* Convert tuple if needed. */
+                               if (relentry->attrmap)
                                {
-                                       Assert(relation->rd_rel->relispartition);
-                                       ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-                                       relation = ancestor;
-                                       /* Convert tuple if needed. */
-                                       if (relentry->map)
-                                               tuple = execute_attr_map_tuple(tuple, relentry->map);
+                                       TupleDesc       tupdesc = RelationGetDescr(targetrel);
+
+                                       new_slot = execute_attr_map_slot(relentry->attrmap,
+                                                                                                        new_slot,
+                                                                                                        MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
                                }
+                       }
 
-                               OutputPluginPrepareWrite(ctx, true);
-                               logicalrep_write_insert(ctx->out, xid, relation, tuple,
-                                                                               data->binary);
-                               OutputPluginWrite(ctx, true);
+                       /* Check row filter */
+                       if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
+                                                                        &action))
                                break;
-                       }
+
+                       /*
+                        * Schema should be sent using the original relation because it
+                        * also sends the ancestor's relation.
+                        */
+                       maybe_send_schema(ctx, change, relation, relentry);
+
+                       OutputPluginPrepareWrite(ctx, true);
+                       logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+                                                                       data->binary);
+                       OutputPluginWrite(ctx, true);
+                       break;
                case REORDER_BUFFER_CHANGE_UPDATE:
+                       if (change->data.tp.oldtuple)
                        {
-                               HeapTuple       oldtuple = change->data.tp.oldtuple ?
-                               &change->data.tp.oldtuple->tuple : NULL;
-                               HeapTuple       newtuple = &change->data.tp.newtuple->tuple;
+                               old_slot = relentry->old_slot;
+                               ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+                                                                  old_slot, false);
+                       }
 
-                               /* Switch relation if publishing via root. */
-                               if (relentry->publish_as_relid != RelationGetRelid(relation))
+                       new_slot = relentry->new_slot;
+                       ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+                                                          new_slot, false);
+
+                       /* Switch relation if publishing via root. */
+                       if (relentry->publish_as_relid != RelationGetRelid(relation))
+                       {
+                               Assert(relation->rd_rel->relispartition);
+                               ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+                               targetrel = ancestor;
+                               /* Convert tuples if needed. */
+                               if (relentry->attrmap)
                                {
-                                       Assert(relation->rd_rel->relispartition);
-                                       ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-                                       relation = ancestor;
-                                       /* Convert tuples if needed. */
-                                       if (relentry->map)
-                                       {
-                                               if (oldtuple)
-                                                       oldtuple = execute_attr_map_tuple(oldtuple,
-                                                                                                                         relentry->map);
-                                               newtuple = execute_attr_map_tuple(newtuple,
-                                                                                                                 relentry->map);
-                                       }
+                                       TupleDesc       tupdesc = RelationGetDescr(targetrel);
+
+                                       if (old_slot)
+                                               old_slot = execute_attr_map_slot(relentry->attrmap,
+                                                                                                                old_slot,
+                                                                                                                MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+
+                                       new_slot = execute_attr_map_slot(relentry->attrmap,
+                                                                                                        new_slot,
+                                                                                                        MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
                                }
+                       }
 
-                               OutputPluginPrepareWrite(ctx, true);
-                               logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-                                                                               newtuple, data->binary);
-                               OutputPluginWrite(ctx, true);
+                       /* Check row filter */
+                       if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+                                                                        relentry, &action))
                                break;
+
+                       maybe_send_schema(ctx, change, relation, relentry);
+
+                       OutputPluginPrepareWrite(ctx, true);
+
+                       /*
+                        * Updates could be transformed to inserts or deletes based on the
+                        * results of the row filter for old and new tuple.
+                        */
+                       switch (action)
+                       {
+                               case REORDER_BUFFER_CHANGE_INSERT:
+                                       logicalrep_write_insert(ctx->out, xid, targetrel,
+                                                                                       new_slot, data->binary);
+                                       break;
+                               case REORDER_BUFFER_CHANGE_UPDATE:
+                                       logicalrep_write_update(ctx->out, xid, targetrel,
+                                                                                       old_slot, new_slot, data->binary);
+                                       break;
+                               case REORDER_BUFFER_CHANGE_DELETE:
+                                       logicalrep_write_delete(ctx->out, xid, targetrel,
+                                                                                       old_slot, data->binary);
+                                       break;
+                               default:
+                                       Assert(false);
                        }
+
+                       OutputPluginWrite(ctx, true);
+                       break;
                case REORDER_BUFFER_CHANGE_DELETE:
                        if (change->data.tp.oldtuple)
                        {
-                               HeapTuple       oldtuple = &change->data.tp.oldtuple->tuple;
+                               old_slot = relentry->old_slot;
+
+                               ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+                                                                  old_slot, false);
 
                                /* Switch relation if publishing via root. */
                                if (relentry->publish_as_relid != RelationGetRelid(relation))
                                {
                                        Assert(relation->rd_rel->relispartition);