These encapsulate a relation when referred from replication DDL.
Currently they don't do anything useful (they're just wrappers around
RangeVar and Relation respectively) but in the future they'll be used to
carry column lists.
Extracted from a larger patch by Rahila Syed.
Author: Rahila Syed <rahilasyed90@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektNRH8NJ1jf95g@mail.gmail.com
* Insert new publication / relation mapping.
*/
ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
bool if_not_exists)
{
Relation rel;
HeapTuple tup;
Datum values[Natts_pg_publication_rel];
bool nulls[Natts_pg_publication_rel];
- Oid relid = RelationGetRelid(targetrel);
+ Oid relid = RelationGetRelid(targetrel->relation);
Oid prrelid;
Publication *pub = GetPublication(pubid);
ObjectAddress myself,
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("relation \"%s\" is already member of publication \"%s\"",
- RelationGetRelationName(targetrel), pub->name)));
+ RelationGetRelationName(targetrel->relation), pub->name)));
}
- check_publication_add_relation(targetrel);
+ check_publication_add_relation(targetrel->relation);
/* Form a tuple. */
memset(values, 0, sizeof(values));
table_close(rel, RowExclusiveLock);
/* Invalidate relcache so that publication info is rebuilt. */
- CacheInvalidateRelcache(targetrel);
+ CacheInvalidateRelcache(targetrel->relation);
return myself;
}
foreach(newlc, rels)
{
- Relation newrel = (Relation) lfirst(newlc);
+ PublicationRelInfo *newpubrel;
- if (RelationGetRelid(newrel) == oldrelid)
+ newpubrel = (PublicationRelInfo *) lfirst(newlc);
+ if (RelationGetRelid(newpubrel->relation) == oldrelid)
{
found = true;
break;
}
}
-
+ /* Not yet in the list, open it and add to the list */
if (!found)
{
- Relation oldrel = table_open(oldrelid,
- ShareUpdateExclusiveLock);
+ Relation oldrel;
+ PublicationRelInfo *pubrel;
+
+ /* Wrap relation into PublicationRelInfo */
+ oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
+
+ pubrel = palloc(sizeof(PublicationRelInfo));
+ pubrel->relation = oldrel;
- delrels = lappend(delrels, oldrel);
+ delrels = lappend(delrels, pubrel);
}
}
}
/*
- * Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
- * add them to a publication.
+ * Open relations specified by a PublicationTable list.
+ * In the returned list of PublicationRelInfo, tables are locked
+ * in ShareUpdateExclusiveLock mode in order to add them to a publication.
*/
static List *
OpenTableList(List *tables)
*/
foreach(lc, tables)
{
- RangeVar *rv = lfirst_node(RangeVar, lc);
- bool recurse = rv->inh;
+ PublicationTable *t = lfirst_node(PublicationTable, lc);
+ bool recurse = t->relation->inh;
Relation rel;
Oid myrelid;
+ PublicationRelInfo *pub_rel;
/* Allow query cancel in case this takes a long time */
CHECK_FOR_INTERRUPTS();
- rel = table_openrv(rv, ShareUpdateExclusiveLock);
+ rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
myrelid = RelationGetRelid(rel);
/*
continue;
}
- rels = lappend(rels, rel);
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, myrelid);
/*
/* find_all_inheritors already got lock */
rel = table_open(childrelid, NoLock);
- rels = lappend(rels, rel);
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
relids = lappend_oid(relids, childrelid);
}
}
foreach(lc, rels)
{
- Relation rel = (Relation) lfirst(lc);
+ PublicationRelInfo *pub_rel;
- table_close(rel, NoLock);
+ pub_rel = (PublicationRelInfo *) lfirst(lc);
+ table_close(pub_rel->relation, NoLock);
}
}
foreach(lc, rels)
{
- Relation rel = (Relation) lfirst(lc);
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
ObjectAddress obj;
/* Must be owner of the table or superuser. */
aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
RelationGetRelationName(rel));
- obj = publication_add_relation(pubid, rel, if_not_exists);
+ obj = publication_add_relation(pubid, pub_rel, if_not_exists);
if (stmt)
{
EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
foreach(lc, rels)
{
- Relation rel = (Relation) lfirst(lc);
+ PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pubrel->relation;
Oid relid = RelationGetRelid(rel);
prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
return newnode;
}
+static PublicationTable *
+_copyPublicationTable(const PublicationTable *from)
+{
+ PublicationTable *newnode = makeNode(PublicationTable);
+
+ COPY_NODE_FIELD(relation);
+
+ return newnode;
+}
/*
* copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h
case T_PartitionCmd:
retval = _copyPartitionCmd(from);
break;
+ case T_PublicationTable:
+ retval = _copyPublicationTable(from);
+ break;
/*
* MISCELLANEOUS NODES
return true;
}
+static bool
+_equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
+{
+ COMPARE_NODE_FIELD(relation);
+
+ return true;
+}
+
/*
* equal
* returns whether two nodes are equal
case T_PartitionCmd:
retval = _equalPartitionCmd(a, b);
break;
+ case T_PublicationTable:
+ retval = _equalPublicationTable(a, b);
+ break;
default:
elog(ERROR, "unrecognized node type: %d",
transform_element_list transform_type_list
TriggerTransitions TriggerReferencing
vacuum_relation_list opt_vacuum_relation_list
- drop_option_list
+ drop_option_list publication_table_list
%type <node> opt_routine_body
%type <groupclause> group_clause
%type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
%type <node> grouping_sets_clause
-%type <node> opt_publication_for_tables publication_for_tables
+%type <node> opt_publication_for_tables publication_for_tables publication_table
%type <list> opt_fdw_options fdw_options
%type <defelt> fdw_option
;
publication_for_tables:
- FOR TABLE relation_expr_list
+ FOR TABLE publication_table_list
{
$$ = (Node *) $3;
}
}
;
+publication_table_list:
+ publication_table
+ { $$ = list_make1($1); }
+ | publication_table_list ',' publication_table
+ { $$ = lappend($1, $3); }
+ ;
+
+publication_table: relation_expr
+ {
+ PublicationTable *n = makeNode(PublicationTable);
+ n->relation = $1;
+ $$ = (Node *) n;
+ }
+ ;
/*****************************************************************************
*
n->options = $5;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+ | ALTER PUBLICATION name ADD_P TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tableAction = DEFELEM_ADD;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name SET TABLE relation_expr_list
+ | ALTER PUBLICATION name SET TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
n->tableAction = DEFELEM_SET;
$$ = (Node *)n;
}
- | ALTER PUBLICATION name DROP TABLE relation_expr_list
+ | ALTER PUBLICATION name DROP TABLE publication_table_list
{
AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
n->pubname = $3;
PublicationActions pubactions;
} Publication;
+typedef struct PublicationRelInfo
+{
+ Relation relation;
+} PublicationRelInfo;
+
extern Publication *GetPublication(Oid pubid);
extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
extern List *GetRelationPublications(Oid relid);
extern List *GetAllTablesPublicationRelations(bool pubviaroot);
extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
bool if_not_exists);
extern Oid get_publication_oid(const char *pubname, bool missing_ok);
T_PartitionRangeDatum,
T_PartitionCmd,
T_VacuumRelation,
+ T_PublicationTable,
/*
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
bool missing_ok; /* for DROP - skip error if missing? */
} AlterTSConfigurationStmt;
+typedef struct PublicationTable
+{
+ NodeTag type;
+ RangeVar *relation; /* relation to be published */
+} PublicationTable;
typedef struct CreatePublicationStmt
{
PublicationInfo
PublicationPartOpt
PublicationRelInfo
+PublicationTable
PullFilter
PullFilterOps
PushFilter