Reference to relation
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>prqual</structfield> <type>pg_node_tree</type>
+ </para>
+ <para>Expression tree (in <function>nodeToString()</function>
+ representation) for the relation's publication qualifying condition. Null
+ if there is no publication qualifying condition.</para></entry>
+ </row>
</tbody>
</tgroup>
</table>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
remove one or more tables/schemas from the publication. Note that adding
tables/schemas to a publication that is already subscribed to will require an
<literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the
- subscribing side in order to become effective.
+ subscribing side in order to become effective. Note also that the combination
+ of <literal>DROP</literal> with a <literal>WHERE</literal> clause is not
+ allowed.
</para>
<para>
specified, the table and all its descendant tables (if any) are
affected. Optionally, <literal>*</literal> can be specified after the table
name to explicitly indicate that descendant tables are included.
+ If the optional <literal>WHERE</literal> clause is specified, rows for
+ which the <replaceable class="parameter">expression</replaceable>
+ evaluates to false or null will not be published. Note that parentheses
+ are required around the expression. The
+ <replaceable class="parameter">expression</replaceable> is evaluated with
+ the role used for the replication connection.
</para>
</listitem>
</varlistentry>
<para>
Specifies whether to copy pre-existing data in the publications
that are being subscribed to when the replication starts.
- The default is <literal>true</literal>. (Previously-subscribed
- tables are not copied.)
+ The default is <literal>true</literal>.
+ </para>
+ <para>
+ Previously subscribed tables are not copied, even if a table's row
+ filter <literal>WHERE</literal> clause has since been modified.
</para>
</listitem>
</varlistentry>
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
- TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
publication, so they are never explicitly added to the publication.
</para>
+ <para>
+ If the optional <literal>WHERE</literal> clause is specified, rows for
+ which the <replaceable class="parameter">expression</replaceable>
+ evaluates to false or null will not be published. Note that parentheses
+ are required around the expression. It has no effect on
+ <literal>TRUNCATE</literal> commands.
+ </para>
+
<para>
Only persistent base tables and partitioned tables can be part of a
publication. Temporary tables, unlogged tables, foreign tables,
disallowed on those tables.
</para>
+ <para>
+ A <literal>WHERE</literal> (i.e. row filter) expression must contain only
+ columns that are covered by the <literal>REPLICA IDENTITY</literal>, in
+ order for <command>UPDATE</command> and <command>DELETE</command> operations
+ to be published. For publication of <command>INSERT</command> operations,
+ any column may be used in the <literal>WHERE</literal> expression. The
+ <literal>WHERE</literal> clause allows simple expressions that don't have
+ user-defined functions, user-defined operators, user-defined types,
+ user-defined collations, non-immutable built-in functions, or references to
+ system columns.
+ If your publication contains a partitioned table, the publication parameter
+ <literal>publish_via_partition_root</literal> determines if it uses the
+ partition's row filter (if the parameter is false, the default) or the root
+ partitioned table's row filter.
+ </para>
+
<para>
For an <command>INSERT ... ON CONFLICT</command> command, the publication will
publish the operation that actually results from the command. So depending
<para>
<acronym>DDL</acronym> operations are not published.
</para>
+
+ <para>
+ The <literal>WHERE</literal> clause expression is executed with the role used
+ for the replication connection.
+ </para>
</refsect1>
<refsect1>
</programlisting>
</para>
+ <para>
+ Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+ </para>
+
<para>
Create a publication that publishes all changes in all tables:
<programlisting>
that are being subscribed to when the replication starts.
The default is <literal>true</literal>.
</para>
+ <para>
+ If the publications contain <literal>WHERE</literal> clauses, it
+ will affect what data is copied. Refer to the
+ <xref linkend="sql-createsubscription-notes" /> for details.
+ </para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
- <refsect1>
+ <refsect1 id="sql-createsubscription-notes" xreflabel="Notes">
<title>Notes</title>
<para>
the parameter <literal>create_slot = false</literal>. This is an
implementation restriction that might be lifted in a future release.
</para>
+
+ <para>
+ If any table in the publication has a <literal>WHERE</literal> clause, rows
+ for which the <replaceable class="parameter">expression</replaceable>
+ evaluates to false or null will not be published. If the subscription has
+ several publications in which the same table has been published with
+ different <literal>WHERE</literal> clauses, a row will be published if any
+ of the expressions (referring to that publish operation) are satisfied. In
+ the case of different <literal>WHERE</literal> clauses, if one of the
+ publications has no <literal>WHERE</literal> clause (referring to that
+ publish operation) or the publication is declared as
+ <literal>FOR ALL TABLES</literal> or
+ <literal>FOR ALL TABLES IN SCHEMA</literal>, rows are always published
+ regardless of the definition of the other expressions.
+ If the subscriber is a <productname>PostgreSQL</productname> version before
+ 15 then any row filtering is ignored during the initial data synchronization
+ phase. For this case, the user might want to consider deleting any initially
+ copied data that would be incompatible with subsequent filtering.
+ </para>
+
</refsect1>
<refsect1>
return result;
}
+/*
+ * Returns the relid of the topmost ancestor that is published via this
+ * publication if any, otherwise returns InvalidOid.
+ *
+ * Note that the list of ancestors should be ordered such that the topmost
+ * ancestor is at the end of the list.
+ */
+Oid
+GetTopMostAncestorInPublication(Oid puboid, List *ancestors)
+{
+ ListCell *lc;
+ Oid topmost_relid = InvalidOid;
+
+ /*
+ * Find the "topmost" ancestor that is in this publication.
+ */
+ foreach(lc, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc);
+ List *apubids = GetRelationPublications(ancestor);
+ List *aschemaPubids = NIL;
+
+ if (list_member_oid(apubids, puboid))
+ topmost_relid = ancestor;
+ else
+ {
+ aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
+ if (list_member_oid(aschemaPubids, puboid))
+ topmost_relid = ancestor;
+ }
+
+ list_free(apubids);
+ list_free(aschemaPubids);
+ }
+
+ return topmost_relid;
+}
+
/*
* Insert new publication / relation mapping.
*/
ObjectAddress
-publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+publication_add_relation(Oid pubid, PublicationRelInfo *pri,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
- Oid relid = RelationGetRelid(targetrel->relation);
+ Relation targetrel = pri->relation;
+ Oid relid = RelationGetRelid(targetrel);
Oid pubreloid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
- RelationGetRelationName(targetrel->relation), pub->name)));
+ RelationGetRelationName(targetrel), pub->name)));
}
- check_publication_add_relation(targetrel->relation);
+ check_publication_add_relation(targetrel);
/* Form a tuple. */
memset(values, 0, sizeof(values));
values[Anum_pg_publication_rel_prrelid - 1] =
ObjectIdGetDatum(relid);
+ /* Add qualifications, if available */
+ if (pri->whereClause != NULL)
+ values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
+ else
+ nulls[Anum_pg_publication_rel_prqual - 1] = true;
+
tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
/* Insert tuple into catalog. */
ObjectAddressSet(referenced, RelationRelationId, relid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+ /* Add dependency on the objects mentioned in the qualifications */
+ if (pri->whereClause)
+ recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
+ DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
+ false);
+
/* Close the table. */
table_close(rel, RowExclusiveLock);
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
+/*
+ * Information used to validate the columns in the row filter expression. See
+ * contain_invalid_rfcolumn_walker for details.
+ */
+typedef struct rf_context
+{
+ Bitmapset *bms_replident; /* bitset of replica identity columns */
+ bool pubviaroot; /* true if we are validating the parent
+ * relation's row filter */
+ Oid relid; /* relid of the relation */
+ Oid parentid; /* relid of the parent relation */
+} rf_context;
+
static List *OpenRelIdList(List *relids);
static List *OpenTableList(List *tables);
static void CloseTableList(List *rels);
}
}
+/*
+ * Returns true if any of the columns used in the row filter WHERE expression is
+ * not part of REPLICA IDENTITY, false otherwise.
+ */
+static bool
+contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
+{
+ if (node == NULL)
+ return false;
+
+ if (IsA(node, Var))
+ {
+ Var *var = (Var *) node;
+ AttrNumber attnum = var->varattno;
+
+ /*
+ * If pubviaroot is true, we are validating the row filter of the
+ * parent table, but the bitmap contains the replica identity
+ * information of the child table. So, get the column number of the
+ * child table as parent and child column order could be different.
+ */
+ if (context->pubviaroot)
+ {
+ char *colname = get_attname(context->parentid, attnum, false);
+
+ attnum = get_attnum(context->relid, colname);
+ }
+
+ if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
+ context->bms_replident))
+ return true;
+ }
+
+ return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
+ (void *) context);
+}
+
+/*
+ * Check if all columns referenced in the filter expression are part of the
+ * REPLICA IDENTITY index or not.
+ *
+ * Returns true if any invalid column is found.
+ */
+bool
+contain_invalid_rfcolumn(Oid pubid, Relation relation, List *ancestors,
+ bool pubviaroot)
+{
+ HeapTuple rftuple;
+ Oid relid = RelationGetRelid(relation);
+ Oid publish_as_relid = RelationGetRelid(relation);
+ bool result = false;
+ Datum rfdatum;
+ bool rfisnull;
+
+ /*
+ * FULL means all columns are in the REPLICA IDENTITY, so all columns are
+ * allowed in the row filter and we can skip the validation.
+ */
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ return false;
+
+ /*
+ * For a partition, if pubviaroot is true, find the topmost ancestor that
+ * is published via this publication as we need to use its row filter
+ * expression to filter the partition's changes.
+ *
+ * Note that even though the row filter used is for an ancestor, the
+ * REPLICA IDENTITY used will be for the actual child table.
+ */
+ if (pubviaroot && relation->rd_rel->relispartition)
+ {
+ publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors);
+
+ if (!OidIsValid(publish_as_relid))
+ publish_as_relid = relid;
+ }
+
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(publish_as_relid),
+ ObjectIdGetDatum(pubid));
+
+ if (!HeapTupleIsValid(rftuple))
+ return false;
+
+ rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &rfisnull);
+
+ if (!rfisnull)
+ {
+ rf_context context = {0};
+ Node *rfnode;
+ Bitmapset *bms = NULL;
+
+ context.pubviaroot = pubviaroot;
+ context.parentid = publish_as_relid;
+ context.relid = relid;
+
+ /* Remember columns that are part of the REPLICA IDENTITY */
+ bms = RelationGetIndexAttrBitmap(relation,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+ context.bms_replident = bms;
+ rfnode = stringToNode(TextDatumGetCString(rfdatum));
+ result = contain_invalid_rfcolumn_walker(rfnode, &context);
+
+ bms_free(bms);
+ pfree(rfnode);
+ }
+
+ ReleaseSysCache(rftuple);
+
+ return result;
+}
+
+/* check_functions_in_node callback */
+static bool
+contain_mutable_or_user_functions_checker(Oid func_id, void *context)
+{
+ return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
+ func_id >= FirstNormalObjectId);
+}
+
+/*
+ * Check if the node contains any unallowed object. See
+ * check_simple_rowfilter_expr_walker.
+ *
+ * Returns the error detail message in errdetail_msg for unallowed expressions.
+ */
+static void
+expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg)
+{
+ if (IsA(node, List))
+ {
+ /*
+ * OK, we don't need to perform other expr checks for List nodes
+ * because those are undefined for List.
+ */
+ return;
+ }
+
+ if (exprType(node) >= FirstNormalObjectId)
+ *errdetail_msg = _("User-defined types are not allowed.");
+ else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
+ (void *) pstate))
+ *errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
+ else if (exprCollation(node) >= FirstNormalObjectId ||
+ exprInputCollation(node) >= FirstNormalObjectId)
+ *errdetail_msg = _("User-defined collations are not allowed.");
+}
+
+/*
+ * The row filter walker checks if the row filter expression is a "simple
+ * expression".
+ *
+ * It allows only simple or compound expressions such as:
+ * - (Var Op Const)
+ * - (Var Op Var)
+ * - (Var Op Const) AND/OR (Var Op Const)
+ * - etc
+ * (where Var is a column of the table this filter belongs to)
+ *
+ * The simple expression has the following restrictions:
+ * - User-defined operators are not allowed;
+ * - User-defined functions are not allowed;
+ * - User-defined types are not allowed;
+ * - User-defined collations are not allowed;
+ * - Non-immutable built-in functions are not allowed;
+ * - System columns are not allowed.
+ *
+ * NOTES
+ *
+ * We don't allow user-defined functions/operators/types/collations because
+ * (a) if a user drops a user-defined object used in a row filter expression or
+ * if there is any other error while using it, the logical decoding
+ * infrastructure won't be able to recover from such an error even if the
+ * object is recreated again because a historic snapshot is used to evaluate
+ * the row filter;
+ * (b) a user-defined function can be used to access tables that could have
+ * unpleasant results because a historic snapshot is used. That's why only
+ * immutable built-in functions are allowed in row filter expressions.
+ *
+ * We don't allow system columns because currently, we don't have that
+ * information in the tuple passed to downstream. Also, as we don't replicate
+ * those to subscribers, there doesn't seem to be a need for a filter on those
+ * columns.
+ *
+ * We can allow other node types after more analysis and testing.
+ */
+static bool
+check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
+{
+ char *errdetail_msg = NULL;
+
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_Var:
+ /* System columns are not allowed. */
+ if (((Var *) node)->varattno < InvalidAttrNumber)
+ errdetail_msg = _("System columns are not allowed.");
+ break;
+ case T_OpExpr:
+ case T_DistinctExpr:
+ case T_NullIfExpr:
+ /* OK, except user-defined operators are not allowed. */
+ if (((OpExpr *) node)->opno >= FirstNormalObjectId)
+ errdetail_msg = _("User-defined operators are not allowed.");
+ break;
+ case T_ScalarArrayOpExpr:
+ /* OK, except user-defined operators are not allowed. */
+ if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
+ errdetail_msg = _("User-defined operators are not allowed.");
+
+ /*
+ * We don't need to check the hashfuncid and negfuncid of
+ * ScalarArrayOpExpr as those functions are only built for a
+ * subquery.
+ */
+ break;
+ case T_RowCompareExpr:
+ {
+ ListCell *opid;
+
+ /* OK, except user-defined operators are not allowed. */
+ foreach(opid, ((RowCompareExpr *) node)->opnos)
+ {
+ if (lfirst_oid(opid) >= FirstNormalObjectId)
+ {
+ errdetail_msg = _("User-defined operators are not allowed.");
+ break;
+ }
+ }
+ }
+ break;
+ case T_Const:
+ case T_FuncExpr:
+ case T_BoolExpr:
+ case T_RelabelType:
+ case T_CollateExpr:
+ case T_CaseExpr:
+ case T_CaseTestExpr:
+ case T_ArrayExpr:
+ case T_RowExpr:
+ case T_CoalesceExpr:
+ case T_MinMaxExpr:
+ case T_XmlExpr:
+ case T_NullTest:
+ case T_BooleanTest:
+ case T_List:
+ /* OK, supported */
+ break;
+ default:
+ errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations and immutable built-in functions.");
+ break;
+ }
+
+ /*
+ * For all the supported nodes, check the types, functions, and collations
+ * used in the nodes.
+ */
+ if (!errdetail_msg)
+ expr_allowed_in_node(node, pstate, &errdetail_msg);
+
+ if (errdetail_msg)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("invalid publication WHERE expression"),
+ errdetail("%s", errdetail_msg),
+ parser_errposition(pstate, exprLocation(node))));
+
+ return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
+ (void *) pstate);
+}
+
+/*
+ * Check if the row filter expression is a "simple expression".
+ *
+ * See check_simple_rowfilter_expr_walker for details.
+ */
+static bool
+check_simple_rowfilter_expr(Node *node, ParseState *pstate)
+{
+ return check_simple_rowfilter_expr_walker(node, pstate);
+}
+
+/*
+ * Transform the publication WHERE expression for all the relations in the list,
+ * ensuring it is coerced to boolean and necessary collation information is
+ * added if required, and add a new nsitem/RTE for the associated relation to
+ * the ParseState's namespace list.
+ *
+ * Also check the publication row filter expression and throw an error if
+ * anything not permitted or unexpected is encountered.
+ */
+static void
+TransformPubWhereClauses(List *tables, const char *queryString,
+ bool pubviaroot)
+{
+ ListCell *lc;
+
+ foreach(lc, tables)
+ {
+ ParseNamespaceItem *nsitem;
+ Node *whereclause = NULL;
+ ParseState *pstate;
+ PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
+
+ if (pri->whereClause == NULL)
+ continue;
+
+ /*
+ * If the publication doesn't publish changes via the root partitioned
+ * table, the partition's row filter will be used. So disallow using
+ * WHERE clause on partitioned table in this case.
+ */
+ if (!pubviaroot &&
+ pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot use publication WHERE clause for relation \"%s\"",
+ RelationGetRelationName(pri->relation)),
+ errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
+ "publish_via_partition_root")));
+
+ pstate = make_parsestate(NULL);
+ pstate->p_sourcetext = queryString;
+
+ nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
+ AccessShareLock, NULL,
+ false, false);
+
+ addNSItemToQuery(pstate, nsitem, false, true, true);
+
+ whereclause = transformWhereClause(pstate,
+ copyObject(pri->whereClause),
+ EXPR_KIND_WHERE,
+ "PUBLICATION WHERE");
+
+ /* Fix up collation information */
+ assign_expr_collations(pstate, whereclause);
+
+ /*
+ * We allow only simple expressions in row filters. See
+ * check_simple_rowfilter_expr_walker.
+ */
+ check_simple_rowfilter_expr(whereclause, pstate);
+
+ free_parsestate(pstate);
+
+ pri->whereClause = whereclause;
+ }
+}
+
/*
* Create new publication.
*/
rels = OpenTableList(relations);
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
+
+ TransformPubWhereClauses(rels, pstate->p_sourcetext,
+ publish_via_partition_root);
+
PublicationAddTables(puboid, rels, true, NULL);
CloseTableList(rels);
}
bool publish_via_partition_root;
ObjectAddress obj;
Form_pg_publication pubform;
+ List *root_relids = NIL;
+ ListCell *lc;
parse_publication_options(pstate,
stmt->options,
&publish_via_partition_root_given,
&publish_via_partition_root);
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /*
+ * If the publication doesn't publish changes via the root partitioned
+ * table, the partition's row filter will be used. So disallow using WHERE
+ * clause on partitioned table in this case.
+ */
+ if (!pubform->puballtables && publish_via_partition_root_given &&
+ !publish_via_partition_root)
+ {
+ /*
+ * Lock the publication so nobody else can do anything with it. This
+ * prevents concurrent alter to add partitioned table(s) with WHERE
+ * clause(s) which we don't allow when not publishing via root.
+ */
+ LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+ AccessShareLock);
+
+ root_relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ROOT);
+
+ foreach(lc, root_relids)
+ {
+ HeapTuple rftuple;
+ Oid relid = lfirst_oid(lc);
+
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubform->oid));
+
+ if (HeapTupleIsValid(rftuple) &&
+ !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL))
+ {
+ HeapTuple tuple;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_class relform = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (relform->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot set %s for publication \"%s\"",
+ "publish_via_partition_root = false",
+ stmt->pubname),
+ errdetail("The publication contains a WHERE clause for a partitioned table \"%s\" "
+ "which is not allowed when %s is false.",
+ NameStr(relform->relname),
+ "publish_via_partition_root")));
+
+ ReleaseSysCache(tuple);
+ }
+
+ ReleaseSysCache(rftuple);
+ }
+ }
+ }
+
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
* invalidate all partitions contained in the respective partition
* trees, not just those explicitly mentioned in the publication.
*/
- relids = GetPublicationRelations(pubform->oid,
- PUBLICATION_PART_ALL);
+ if (root_relids == NIL)
+ relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+ else
+ {
+ /*
+ * We already got tables explicitly mentioned in the publication.
+ * Now get all partitions for the partitioned table in the list.
+ */
+ foreach(lc, root_relids)
+ relids = GetPubPartitionOptionRelations(relids,
+ PUBLICATION_PART_ALL,
+ lfirst_oid(lc));
+ }
+
schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
PUBLICATION_PART_ALL);
relids = list_concat_unique_oid(relids, schemarelids);
*/
static void
AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
- List *tables, List *schemaidlist)
+ List *tables, List *schemaidlist,
+ const char *queryString)
{
List *rels = NIL;
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
CheckObjSchemaNotAlreadyInPublication(rels, schemas,
PUBLICATIONOBJ_TABLE);
+
+ TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+
PublicationAddTables(pubid, rels, false, stmt);
}
else if (stmt->action == AP_DropObjects)
CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
PUBLICATIONOBJ_TABLE);
- /* Calculate which relations to drop. */
+ TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
+
+ /*
+ * To recreate the relation list for the publication, look for
+ * existing relations that do not need to be dropped.
+ */
foreach(oldlc, oldrelids)
{
Oid oldrelid = lfirst_oid(oldlc);
ListCell *newlc;
+ PublicationRelInfo *oldrel;
bool found = false;
+ HeapTuple rftuple;
+ bool rfisnull = true;
+ Node *oldrelwhereclause = NULL;
+
+ /* look up the cache for the old relmap */
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(oldrelid),
+ ObjectIdGetDatum(pubid));
+
+ if (HeapTupleIsValid(rftuple))
+ {
+ Datum whereClauseDatum;
+
+ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &rfisnull);
+ if (!rfisnull)
+ oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
+
+ ReleaseSysCache(rftuple);
+ }
foreach(newlc, rels)
{
PublicationRelInfo *newpubrel;
newpubrel = (PublicationRelInfo *) lfirst(newlc);
+
+ /*
+ * Check if any of the new set of relations matches with the
+ * existing relations in the publication. Additionally, if the
+ * relation has an associated WHERE clause, check the WHERE
+ * expressions also match. Drop the rest.
+ */
if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
- found = true;
- break;
+ if (equal(oldrelwhereclause, newpubrel->whereClause))
+ {
+ found = true;
+ break;
+ }
}
}
- /* Not yet in the list, open it and add to the list */
- if (!found)
- {
- Relation oldrel;
- PublicationRelInfo *pubrel;
- /* Wrap relation into PublicationRelInfo */
- oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
+ if (oldrelwhereclause)
+ pfree(oldrelwhereclause);
- pubrel = palloc(sizeof(PublicationRelInfo));
- pubrel->relation = oldrel;
-
- delrels = lappend(delrels, pubrel);
+ /*
+ * Add the non-matched relations to a list so that they can be
+ * dropped.
+ */
+ if (!found)
+ {
+ oldrel = palloc(sizeof(PublicationRelInfo));
+ oldrel->whereClause = NULL;
+ oldrel->relation = table_open(oldrelid,
+ ShareUpdateExclusiveLock);
+ delrels = lappend(delrels, oldrel);
}
}
{
List *relations = NIL;
List *schemaidlist = NIL;
+ Oid pubid = pubform->oid;
ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
&schemaidlist);
CheckAlterPublication(stmt, tup, relations, schemaidlist);
+ heap_freetuple(tup);
+
/*
* Lock the publication so nobody else can do anything with it. This
* prevents concurrent alter to add table(s) that were already going
* addition of schema(s) for which there is any corresponding table
* being added by this command.
*/
- LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
+ LockDatabaseObject(PublicationRelationId, pubid, 0,
AccessExclusiveLock);
/*
* It is possible that by the time we acquire the lock on publication,
* concurrent DDL has removed it. We can test this by checking the
- * existence of publication.
+ * existence of publication. We get the tuple again to avoid the risk
+ * of any publication option getting changed.
*/
- if (!SearchSysCacheExists1(PUBLICATIONOID,
- ObjectIdGetDatum(pubform->oid)))
+ tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+ if (!HeapTupleIsValid(tup))
ereport(ERROR,
errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("publication \"%s\" does not exist",
stmt->pubname));
- AlterPublicationTables(stmt, tup, relations, schemaidlist);
+ AlterPublicationTables(stmt, tup, relations, schemaidlist,
+ pstate->p_sourcetext);
AlterPublicationSchemas(stmt, tup, schemaidlist);
}
List *relids = NIL;
List *rels = NIL;
ListCell *lc;
+ List *relids_with_rf = NIL;
/*
* Open, share-lock, and check all the explicitly-specified relations
*/
if (list_member_oid(relids, myrelid))
{
+ /* Disallow duplicate tables if there are any with row filters. */
+ if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
+ RelationGetRelationName(rel))));
+
table_close(rel, ShareUpdateExclusiveLock);
continue;
}
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ pub_rel->whereClause = t->whereClause;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
+ if (t->whereClause)
+ relids_with_rf = lappend_oid(relids_with_rf, myrelid);
+
/*
* Add children of this rel, if requested, so that they too are added
* to the publication. A partitioned table can't have any inheritance
* tables.
*/
if (list_member_oid(relids, childrelid))
+ {
+ /*
+ * We don't allow to specify row filter for both parent
+ * and child table at the same time as it is not very
+ * clear which one should be given preference.
+ */
+ if (childrelid != myrelid &&
+ (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
+ RelationGetRelationName(rel))));
+
continue;
+ }
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
pub_rel = palloc(sizeof(PublicationRelInfo));
pub_rel->relation = rel;
+ /* child inherits WHERE clause from parent */
+ pub_rel->whereClause = t->whereClause;
rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
+
+ if (t->whereClause)
+ relids_with_rf = lappend_oid(relids_with_rf, childrelid);
}
}
}
list_free(relids);
+ list_free(relids_with_rf);
return rels;
}
pub_rel = (PublicationRelInfo *) lfirst(lc);
table_close(pub_rel->relation, NoLock);
}
+
+ list_free_deep(rels);
}
/*
RelationGetRelationName(rel))));
}
+ if (pubrel->whereClause)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot use a WHERE clause when removing a table from a publication")));
+
ObjectAddressSet(obj, PublicationRelRelationId, prid);
performDeletion(&obj, DROP_CASCADE, 0);
}
void
CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
{
- PublicationActions *pubactions;
+ PublicationDesc pubdesc;
/* We only need to do checks for UPDATE and DELETE. */
if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
return;
+ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ return;
+
+ /*
+ * It is only safe to execute UPDATE/DELETE when all columns, referenced
+ * in the row filters from publications which the relation is in, are
+ * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
+ * or the table does not publish UPDATEs or DELETEs.
+ *
+ * XXX We could optimize it by first checking whether any of the
+ * publications have a row filter for this relation. If not and relation
+ * has replica identity then we can avoid building the descriptor but as
+ * this happens only one time it doesn't seem worth the additional
+ * complexity.
+ */
+ RelationBuildPublicationDesc(rel, &pubdesc);
+ if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot update table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+ else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("cannot delete from table \"%s\"",
+ RelationGetRelationName(rel)),
+ errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
+
/* If relation has replica identity we are always good. */
- if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
- OidIsValid(RelationGetReplicaIndex(rel)))
+ if (OidIsValid(RelationGetReplicaIndex(rel)))
return;
/*
*
* Check if the table publishes UPDATES or DELETES.
*/
- pubactions = GetRelationPublicationActions(rel);
- if (cmd == CMD_UPDATE && pubactions->pubupdate)
+ if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
RelationGetRelationName(rel)),
errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
- else if (cmd == CMD_DELETE && pubactions->pubdelete)
+ else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
PublicationTable *newnode = makeNode(PublicationTable);
COPY_NODE_FIELD(relation);
+ COPY_NODE_FIELD(whereClause);
return newnode;
}
_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
{
COMPARE_NODE_FIELD(relation);
+ COMPARE_NODE_FIELD(whereClause);
return true;
}
* relation_expr here.
*/
PublicationObjSpec:
- TABLE relation_expr
+ TABLE relation_expr OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_TABLE;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $2;
+ $$->pubtable->whereClause = $3;
}
| ALL TABLES IN_P SCHEMA ColId
{
$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
$$->location = @5;
}
- | ColId
+ | ColId OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
- $$->name = $1;
+ if ($2)
+ {
+ /*
+ * The OptWhereClause must be stored here but it is
+ * valid only for tables. For non-table objects, an
+ * error will be thrown later via
+ * preprocess_pubobj_list().
+ */
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+ $$->pubtable->whereClause = $2;
+ }
+ else
+ {
+ $$->name = $1;
+ }
$$->location = @1;
}
- | ColId indirection
+ | ColId indirection OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+ $$->pubtable->whereClause = $3;
$$->location = @1;
}
/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
- | extended_relation_expr
+ | extended_relation_expr OptWhereClause
{
$$ = makeNode(PublicationObjSpec);
$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
$$->pubtable = makeNode(PublicationTable);
$$->pubtable->relation = $1;
+ $$->pubtable->whereClause = $2;
}
| CURRENT_SCHEMA
{
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid table name at or near"),
parser_errposition(pubobj->location));
- else if (pubobj->name)
+
+ if (pubobj->name)
{
/* convert it to PublicationTable */
PublicationTable *pubtable = makeNode(PublicationTable);
else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA ||
pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
{
+ /* WHERE clause is not allowed on a schema object */
+ if (pubobj->pubtable && pubobj->pubtable->whereClause)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("WHERE clause not allowed for schema"),
+ parser_errposition(pubobj->location));
+
/*
* We can distinguish between the different type of schema
* objects based on whether name and pubtable is set.
static void logicalrep_write_attrs(StringInfo out, Relation rel);
static void logicalrep_write_tuple(StringInfo out, Relation rel,
- HeapTuple tuple, bool binary);
-
+ TupleTableSlot *slot,
+ bool binary);
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
*/
void
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple newtuple, bool binary)
+ TupleTableSlot *newslot, bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
pq_sendint32(out, RelationGetRelid(rel));
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
*/
void
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+ TupleTableSlot *oldslot, TupleTableSlot *newslot,
+ bool binary)
{
pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
- if (oldtuple != NULL)
+ if (oldslot != NULL)
{
if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
pq_sendbyte(out, 'O'); /* old tuple follows */
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
pq_sendbyte(out, 'N'); /* new tuple follows */
- logicalrep_write_tuple(out, rel, newtuple, binary);
+ logicalrep_write_tuple(out, rel, newslot, binary);
}
/*
*/
void
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
- HeapTuple oldtuple, bool binary)
+ TupleTableSlot *oldslot, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
else
pq_sendbyte(out, 'K'); /* old key follows */
- logicalrep_write_tuple(out, rel, oldtuple, binary);
+ logicalrep_write_tuple(out, rel, oldslot, binary);
}
/*
* Write a tuple to the outputstream, in the most efficient format possible.
*/
static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
+ bool binary)
{
TupleDesc desc;
- Datum values[MaxTupleAttributeNumber];
- bool isnull[MaxTupleAttributeNumber];
+ Datum *values;
+ bool *isnull;
int i;
uint16 nliveatts = 0;
}
pq_sendint16(out, nliveatts);
- /* try to allocate enough memory from the get-go */
- enlargeStringInfo(out, tuple->t_len +
- nliveatts * (1 + 4));
-
- heap_deform_tuple(tuple, desc, values, isnull);
+ slot_getallattrs(slot);
+ values = slot->tts_values;
+ isnull = slot->tts_isnull;
/* Write the values */
for (i = 0; i < desc->natts; i++)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in the COPY command.
*/
static void
fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel)
+ LogicalRepRelation *lrel, List **qual)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+ Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
+ ListCell *lc;
+ bool first;
lrel->nspname = nspname;
lrel->relname = relname;
lrel->natts = natt;
walrcv_clear_result(res);
+
+ /*
+ * Get relation's row filter expressions. DISTINCT avoids the same
+ * expression of a table in multiple publications from being included
+ * multiple times in the final expression.
+ *
+ * We need to copy the row even if it matches just one of the
+ * publications, so we later combine all the quals with OR.
+ *
+ * For initial synchronization, row filtering can be ignored in following
+ * cases:
+ *
+ * 1) one of the subscribed publications for the table hasn't specified
+ * any row filter
+ *
+ * 2) one of the subscribed publications has puballtables set to true
+ *
+ * 3) one of the subscribed publications is declared as ALL TABLES IN
+ * SCHEMA that includes this relation
+ */
+ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ {
+ StringInfoData pub_names;
+
+ /* Build the pubname list. */
+ initStringInfo(&pub_names);
+ first = true;
+ foreach(lc, MySubscription->publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&pub_names, ", ");
+
+ appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
+ }
+
+ /* Check for row filters. */
+ resetStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+ " FROM pg_publication p"
+ " LEFT OUTER JOIN pg_publication_rel pr"
+ " ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
+ " LATERAL pg_get_publication_tables(p.pubname) gpt"
+ " WHERE gpt.relid = %u"
+ " AND p.pubname IN ( %s )",
+ lrel->remoteid,
+ lrel->remoteid,
+ pub_names.data);
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
+ nspname, relname, res->err)));
+
+ /*
+ * Multiple row filter expressions for the same table will be combined
+ * by COPY using OR. If any of the filter expressions for this table
+ * are null, it means the whole table will be copied. In this case it
+ * is not necessary to construct a unified row filter expression at
+ * all.
+ */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ Datum rf = slot_getattr(slot, 1, &isnull);
+
+ if (!isnull)
+ *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+ else
+ {
+ /* Ignore filters and cleanup as necessary. */
+ if (*qual)
+ {
+ list_free_deep(*qual);
+ *qual = NIL;
+ }
+ break;
+ }
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+ }
+
pfree(cmd.data);
}
{
LogicalRepRelMapEntry *relmapentry;
LogicalRepRelation lrel;
+ List *qual = NIL;
WalRcvExecResult *res;
StringInfoData cmd;
CopyFromState cstate;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel);
+ RelationGetRelationName(rel), &lrel, &qual);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
/* Start copy on the publisher. */
initStringInfo(&cmd);
- if (lrel.relkind == RELKIND_RELATION)
+
+ /* Regular table with no row filter */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL)
appendStringInfo(&cmd, "COPY %s TO STDOUT",
quote_qualified_identifier(lrel.nspname, lrel.relname));
else
{
/*
- * For non-tables, we need to do COPY (SELECT ...), but we can't just
- * do SELECT * because we need to not copy generated columns.
+ * For non-tables and tables with row filters, we need to do COPY
+ * (SELECT ...), but we can't just do SELECT * because we need to not
+ * copy generated columns. For tables with any row filters, build a
+ * SELECT query with OR'ed row filters for COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
if (i < lrel.natts - 1)
appendStringInfoString(&cmd, ", ");
}
- appendStringInfo(&cmd, " FROM %s) TO STDOUT",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+ appendStringInfoString(&cmd, " FROM ");
+
+ /*
+ * For regular tables, make sure we don't copy data from a child that
+ * inherits the named table as those will be copied separately.
+ */
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfoString(&cmd, "ONLY ");
+
+ appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+ /* list of OR'ed filters */
+ if (qual != NIL)
+ {
+ ListCell *lc;
+ char *q = strVal(linitial(qual));
+
+ appendStringInfo(&cmd, " WHERE %s", q);
+ for_each_from(lc, qual, 1)
+ {
+ q = strVal(lfirst(lc));
+ appendStringInfo(&cmd, " OR %s", q);
+ }
+ list_free_deep(qual);
+ }
+
+ appendStringInfoString(&cmd, ") TO STDOUT");
}
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
pfree(cmd.data);
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
+#include "executor/executor.h"
#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
+#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
+/*
+ * Only 3 publication actions are used for row filtering ("insert", "update",
+ * "delete"). See RelationSyncEntry.exprstate[].
+ */
+enum RowFilterPubAction
+{
+ PUBACTION_INSERT,
+ PUBACTION_UPDATE,
+ PUBACTION_DELETE
+};
+
+#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
+
/*
* Entry in the map used to remember which relation schemas we sent.
*
/* are we publishing this rel? */
PublicationActions pubactions;
+ /*
+ * ExprState array for row filter. Different publication actions don't
+ * allow multiple expressions to always be combined into one, because
+ * updates or deletes restrict the column in expression to be part of the
+ * replica identity index whereas inserts do not have this restriction, so
+ * there is one ExprState per publication action.
+ */
+ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
+ EState *estate; /* executor state used for row filter */
+ MemoryContext cache_expr_cxt; /* private context for exprstate and
+ * estate, if any */
+
+ TupleTableSlot *new_slot; /* slot for storing new tuple */
+ TupleTableSlot *old_slot; /* slot for storing old tuple */
+
/*
* OID of the relation to publish changes as. For a partition, this may
* be set to one of its ancestors whose schema will be used when
* same as 'relid' or if unnecessary due to partition and the ancestor
* having identical TupleDesc.
*/
- TupleConversionMap *map;
+ AttrMap *attrmap;
} RelationSyncEntry;
/* Map used to remember which relation schemas we sent. */
static void init_rel_sync_cache(MemoryContext decoding_context);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
+ Relation relation);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
+static void init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry);
+
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static void pgoutput_row_filter_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+static bool pgoutput_row_filter_exec_expr(ExprState *state,
+ ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr,
+ RelationSyncEntry *entry,
+ ReorderBufferChangeType *action);
/*
* Specify output plugin callbacks
"logical replication output context",
ALLOCSET_DEFAULT_SIZES);
+ data->cachectx = AllocSetContextCreate(ctx->context,
+ "logical replication cache context",
+ ALLOCSET_DEFAULT_SIZES);
+
ctx->output_plugin_private = data;
/* This plugin uses binary protocol. */
return;
/*
- * Nope, so send the schema. If the changes will be published using an
- * ancestor's schema, not the relation's own, send that ancestor's schema
- * before sending relation's own (XXX - maybe sending only the former
- * suffices?). This is also a good place to set the map that will be used
- * to convert the relation's tuples into the ancestor's format, if needed.
+ * Send the schema. If the changes will be published using an ancestor's
+ * schema, not the relation's own, send that ancestor's schema before
+ * sending relation's own (XXX - maybe sending only the former suffices?).
*/
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- TupleDesc indesc = RelationGetDescr(relation);
- TupleDesc outdesc = RelationGetDescr(ancestor);
- MemoryContext oldctx;
-
- /* Map must live as long as the session does. */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- /*
- * Make copies of the TupleDescs that will live as long as the map
- * does before putting into the map.
- */
- indesc = CreateTupleDescCopy(indesc);
- outdesc = CreateTupleDescCopy(outdesc);
- relentry->map = convert_tuples_by_name(indesc, outdesc);
- if (relentry->map == NULL)
- {
- /* Map not necessary, so free the TupleDescs too. */
- FreeTupleDesc(indesc);
- FreeTupleDesc(outdesc);
- }
-
- MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
OutputPluginWrite(ctx, false);
}
+/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+ EState *estate;
+ RangeTblEntry *rte;
+
+ estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->relkind = rel->rd_rel->relkind;
+ rte->rellockmode = AccessShareLock;
+ ExecInitRangeTable(estate, list_make1(rte));
+
+ estate->es_output_cid = GetCurrentCommandId(false);
+
+ return estate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+ Datum ret;
+ bool isnull;
+
+ Assert(state != NULL);
+
+ ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+ elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+ isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
+ isnull ? "true" : "false");
+
+ if (isnull)
+ return false;
+
+ return DatumGetBool(ret);
+}
+
+/*
+ * Initialize the row filter.
+ */
+static void
+pgoutput_row_filter_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+ List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
+ bool no_filter[] = {false, false, false}; /* One per pubaction */
+ MemoryContext oldctx;
+ int idx;
+ bool has_filter = true;
+
+ /*
+ * Find if there are any row filters for this relation. If there are, then
+ * prepare the necessary ExprState and cache it in entry->exprstate. To
+ * build an expression state, we need to ensure the following:
+ *
+ * All the given publication-table mappings must be checked.
+ *
+ * Multiple publications might have multiple row filters for this
+ * relation. Since row filter usage depends on the DML operation, there
+ * are multiple lists (one for each operation) to which row filters will
+ * be appended.
+ *
+ * FOR ALL TABLES implies "don't use row filter expression" so it takes
+ * precedence.
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple rftuple = NULL;
+ Datum rfdatum = 0;
+ bool pub_no_filter = false;
+
+ if (pub->alltables)
+ {
+ /*
+ * If the publication is FOR ALL TABLES then it is treated the
+ * same as if this table has no row filters (even if for other
+ * publications it does).
+ */
+ pub_no_filter = true;
+ }
+ else
+ {
+ /*
+ * Check for the presence of a row filter in this publication.
+ */
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(rftuple))
+ {
+ /* Null indicates no filter. */
+ rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &pub_no_filter);
+ }
+ else
+ {
+ pub_no_filter = true;
+ }
+ }
+
+ if (pub_no_filter)
+ {
+ if (rftuple)
+ ReleaseSysCache(rftuple);
+
+ no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
+ no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
+ no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
+
+ /*
+ * Quick exit if all the DML actions are publicized via this
+ * publication.
+ */
+ if (no_filter[PUBACTION_INSERT] &&
+ no_filter[PUBACTION_UPDATE] &&
+ no_filter[PUBACTION_DELETE])
+ {
+ has_filter = false;
+ break;
+ }
+
+ /* No additional work for this publication. Next one. */
+ continue;
+ }
+
+ /* Form the per pubaction row filter lists. */
+ if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
+ rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
+ rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
+ rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
+ TextDatumGetCString(rfdatum));
+
+ ReleaseSysCache(rftuple);
+ } /* loop all subscribed publications */
+
+ /* Clean the row filter */
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ if (no_filter[idx])
+ {
+ list_free_deep(rfnodes[idx]);
+ rfnodes[idx] = NIL;
+ }
+ }
+
+ if (has_filter)
+ {
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ Assert(entry->cache_expr_cxt == NULL);
+
+ /* Create the memory context for row filters */
+ entry->cache_expr_cxt = AllocSetContextCreate(data->cachectx,
+ "Row filter expressions",
+ ALLOCSET_DEFAULT_SIZES);
+
+ MemoryContextCopyAndSetIdentifier(entry->cache_expr_cxt,
+ RelationGetRelationName(relation));
+
+ /*
+ * Now all the filters for all pubactions are known. Combine them when
+ * their pubactions are the same.
+ */
+ oldctx = MemoryContextSwitchTo(entry->cache_expr_cxt);
+ entry->estate = create_estate_for_relation(relation);
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ List *filters = NIL;
+ Expr *rfnode;
+
+ if (rfnodes[idx] == NIL)
+ continue;
+
+ foreach(lc, rfnodes[idx])
+ filters = lappend(filters, stringToNode((char *) lfirst(lc)));
+
+ /* combine the row filter and cache the ExprState */
+ rfnode = make_orclause(filters);
+ entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
+ } /* for each pubaction */
+ MemoryContextSwitchTo(oldctx);
+
+ RelationClose(relation);
+ }
+}
+
+/*
+ * Initialize the slot for storing new and old tuples, and build the map that
+ * will be used to convert the relation's tuples into the ancestor's format.
+ */
+static void
+init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry)
+{
+ MemoryContext oldctx;
+ TupleDesc oldtupdesc;
+ TupleDesc newtupdesc;
+
+ oldctx = MemoryContextSwitchTo(data->cachectx);
+
+ /*
+ * Create tuple table slots. Create a copy of the TupleDesc as it needs to
+ * live as long as the cache remains.
+ */
+ oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+ newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+
+ entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
+ entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
+
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Cache the map that will be used to convert the relation's tuples into
+ * the ancestor's format, if needed.
+ */
+ if (entry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc indesc = RelationGetDescr(relation);
+ TupleDesc outdesc = RelationGetDescr(ancestor);
+
+ /* Map must live as long as the session does. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
+
+ MemoryContextSwitchTo(oldctx);
+ RelationClose(ancestor);
+ }
+}
+
+/*
+ * Change is checked against the row filter if any.
+ *
+ * Returns true if the change is to be replicated, else false.
+ *
+ * For inserts, evaluate the row filter for new tuple.
+ * For deletes, evaluate the row filter for old tuple.
+ * For updates, evaluate the row filter for old and new tuple.
+ *
+ * For updates, if both evaluations are true, we allow sending the UPDATE and
+ * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
+ * only one of the tuples matches the row filter expression, we transform
+ * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
+ * following rules:
+ *
+ * Case 1: old-row (no match) new-row (no match) -> (drop change)
+ * Case 2: old-row (no match) new row (match) -> INSERT
+ * Case 3: old-row (match) new-row (no match) -> DELETE
+ * Case 4: old-row (match) new row (match) -> UPDATE
+ *
+ * The new action is updated in the action parameter.
+ *
+ * The new slot could be updated when transforming the UPDATE into INSERT,
+ * because the original new tuple might not have column values from the replica
+ * identity.
+ *
+ * Examples:
+ * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
+ * Since the old tuple satisfies, the initial table synchronization copied this
+ * row (or another method was used to guarantee that there is data
+ * consistency). However, after the UPDATE the new tuple doesn't satisfy the
+ * row filter, so from a data consistency perspective, that row should be
+ * removed on the subscriber. The UPDATE should be transformed into a DELETE
+ * statement and be sent to the subscriber. Keeping this row on the subscriber
+ * is undesirable because it doesn't reflect what was defined in the row filter
+ * expression on the publisher. This row on the subscriber would likely not be
+ * modified by replication again. If someone inserted a new row with the same
+ * old identifier, replication could stop due to a constraint violation.
+ *
+ * Let's say the old tuple doesn't match the row filter but the new tuple does.
+ * Since the old tuple doesn't satisfy, the initial table synchronization
+ * probably didn't copy this row. However, after the UPDATE the new tuple does
+ * satisfy the row filter, so from a data consistency perspective, that row
+ * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
+ * statements have no effect (it matches no row -- see
+ * apply_handle_update_internal()). So, the UPDATE should be transformed into a
+ * INSERT statement and be sent to the subscriber. However, this might surprise
+ * someone who expects the data set to satisfy the row filter expression on the
+ * provider.
+ */
+static bool
+pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
+ ReorderBufferChangeType *action)
+{
+ TupleDesc desc;
+ int i;
+ bool old_matched,
+ new_matched,
+ result;
+ TupleTableSlot *tmp_new_slot;
+ TupleTableSlot *new_slot = *new_slot_ptr;
+ ExprContext *ecxt;
+ ExprState *filter_exprstate;
+
+ /*
+ * We need this map to avoid relying on ReorderBufferChangeType enums
+ * having specific values.
+ */
+ static const int map_changetype_pubaction[] = {
+ [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
+ [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
+ [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
+ };
+
+ Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
+ *action == REORDER_BUFFER_CHANGE_UPDATE ||
+ *action == REORDER_BUFFER_CHANGE_DELETE);
+
+ Assert(new_slot || old_slot);
+
+ /* Get the corresponding row filter */
+ filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+
+ /* Bail out if there is no row filter */
+ if (!filter_exprstate)
+ return true;
+
+ elog(DEBUG3, "table \"%s.%s\" has row filter",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation));
+
+ ResetPerTupleExprContext(entry->estate);
+
+ ecxt = GetPerTupleExprContext(entry->estate);
+
+ /*
+ * For the following occasions where there is only one tuple, we can
+ * evaluate the row filter for that tuple and return.
+ *
+ * For inserts, we only have the new tuple.
+ *
+ * For updates, we can have only a new tuple when none of the replica
+ * identity columns changed but we still need to evaluate the row filter
+ * for new tuple as the existing values of those columns might not match
+ * the filter. Also, users can use constant expressions in the row filter,
+ * so we anyway need to evaluate it for the new tuple.
+ *
+ * For deletes, we only have the old tuple.
+ */
+ if (!new_slot || !old_slot)
+ {
+ ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+ result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ return result;
+ }
+
+ /*
+ * Both the old and new tuples must be valid only for updates and need to
+ * be checked against the row filter.
+ */
+ Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+ slot_getallattrs(new_slot);
+ slot_getallattrs(old_slot);
+
+ tmp_new_slot = NULL;
+ desc = RelationGetDescr(relation);
+
+ /*
+ * The new tuple might not have all the replica identity columns, in which
+ * case it needs to be copied over from the old tuple.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ /*
+ * if the column in the new tuple or old tuple is null, nothing to do
+ */
+ if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+ continue;
+
+ /*
+ * Unchanged toasted replica identity columns are only logged in the
+ * old tuple. Copy this over to the new tuple. The changed (or WAL
+ * Logged) toast values are always assembled in memory and set as
+ * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+ */
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+ !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+ {
+ if (!tmp_new_slot)
+ {
+ tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+ ExecClearTuple(tmp_new_slot);
+
+ memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+ desc->natts * sizeof(Datum));
+ memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+ desc->natts * sizeof(bool));
+ }
+
+ tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+ tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+ }
+ }
+
+ ecxt->ecxt_scantuple = old_slot;
+ old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ if (tmp_new_slot)
+ {
+ ExecStoreVirtualTuple(tmp_new_slot);
+ ecxt->ecxt_scantuple = tmp_new_slot;
+ }
+ else
+ ecxt->ecxt_scantuple = new_slot;
+
+ new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ /*
+ * Case 1: if both tuples don't match the row filter, bailout. Send
+ * nothing.
+ */
+ if (!old_matched && !new_matched)
+ return false;
+
+ /*
+ * Case 2: if the old tuple doesn't satisfy the row filter but the new
+ * tuple does, transform the UPDATE into INSERT.
+ *
+ * Use the newly transformed tuple that must contain the column values for
+ * all the replica identity columns. This is required to ensure that the
+ * while inserting the tuple in the downstream node, we have all the
+ * required column values.
+ */
+ if (!old_matched && new_matched)
+ {
+ *action = REORDER_BUFFER_CHANGE_INSERT;
+
+ if (tmp_new_slot)
+ *new_slot_ptr = tmp_new_slot;
+ }
+
+ /*
+ * Case 3: if the old tuple satisfies the row filter but the new tuple
+ * doesn't, transform the UPDATE into DELETE.
+ *
+ * This transformation does not require another tuple. The Old tuple will
+ * be used for DELETE.
+ */
+ else if (old_matched && !new_matched)
+ *action = REORDER_BUFFER_CHANGE_DELETE;
+
+ /*
+ * Case 4: if both tuples match the row filter, transformation isn't
+ * required. (*action is default UPDATE).
+ */
+
+ return true;
+}
+
/*
* Sends the decoded DML over wire.
*
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
+ Relation targetrel = relation;
+ ReorderBufferChangeType action = change->action;
+ TupleTableSlot *old_slot = NULL;
+ TupleTableSlot *new_slot = NULL;
if (!is_publishable_relation(relation))
return;
if (in_streaming)
xid = change->txn->xid;
- relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+ relentry = get_rel_sync_entry(data, relation);
/* First check the table filter */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- maybe_send_schema(ctx, change, relation, relentry);
-
/* Send the data */
- switch (change->action)
+ switch (action)
{
case REORDER_BUFFER_CHANGE_INSERT:
- {
- HeapTuple tuple = &change->data.tp.newtuple->tuple;
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuple if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuple if needed. */
- if (relentry->map)
- tuple = execute_attr_map_tuple(tuple, relentry->map);
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, xid, relation, tuple,
- data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
+ &action))
break;
- }
+
+ /*
+ * Schema should be sent using the original relation because it
+ * also sends the ancestor's relation.
+ */
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+ data->binary);
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_UPDATE:
+ if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = change->data.tp.oldtuple ?
- &change->data.tp.oldtuple->tuple : NULL;
- HeapTuple newtuple = &change->data.tp.newtuple->tuple;
+ old_slot = relentry->old_slot;
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
+ }
- /* Switch relation if publishing via root. */
- if (relentry->publish_as_relid != RelationGetRelid(relation))
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
+ new_slot, false);
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ /* Convert tuples if needed. */
+ if (relentry->attrmap)
{
- Assert(relation->rd_rel->relispartition);
- ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- relation = ancestor;
- /* Convert tuples if needed. */
- if (relentry->map)
- {
- if (oldtuple)
- oldtuple = execute_attr_map_tuple(oldtuple,
- relentry->map);
- newtuple = execute_attr_map_tuple(newtuple,
- relentry->map);
- }
+ TupleDesc tupdesc = RelationGetDescr(targetrel);
+
+ if (old_slot)
+ old_slot = execute_attr_map_slot(relentry->attrmap,
+ old_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+
+ new_slot = execute_attr_map_slot(relentry->attrmap,
+ new_slot,
+ MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
}
+ }
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, xid, relation, oldtuple,
- newtuple, data->binary);
- OutputPluginWrite(ctx, true);
+ /* Check row filter */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
+ relentry, &action))
break;
+
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ /*
+ * Updates could be transformed to inserts or deletes based on the
+ * results of the row filter for old and new tuple.
+ */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ logicalrep_write_insert(ctx->out, xid, targetrel,
+ new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ logicalrep_write_update(ctx->out, xid, targetrel,
+ old_slot, new_slot, data->binary);
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ logicalrep_write_delete(ctx->out, xid, targetrel,
+ old_slot, data->binary);
+ break;
+ default:
+ Assert(false);
}
+
+ OutputPluginWrite(ctx, true);
+ break;
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
- HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
+ old_slot = relentry->old_slot;
+
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
+ old_slot, false);
/* Switch relation if publishing via root. */
if (relentry->publish_as_relid != RelationGetRelid(relation))
{
Assert(relation->rd_rel->relispartition);