diff options
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/catalog/pg_proc.dat | 5 | ||||
| -rw-r--r-- | src/include/catalog/pg_publication.h | 26 | ||||
| -rw-r--r-- | src/include/catalog/pg_publication_namespace.h | 10 | ||||
| -rw-r--r-- | src/include/commands/sequence.h | 1 | ||||
| -rw-r--r-- | src/include/nodes/parsenodes.h | 8 | ||||
| -rw-r--r-- | src/include/replication/logicalproto.h | 19 | ||||
| -rw-r--r-- | src/include/replication/pgoutput.h | 1 |
7 files changed, 62 insertions, 8 deletions
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d8e8715ed1c..699bd0aa3e3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11540,6 +11540,11 @@ provolatile => 's', prorettype => 'oid', proargtypes => 'text', proallargtypes => '{text,oid}', proargmodes => '{i,o}', proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' }, +{ oid => '8000', descr => 'get OIDs of sequences in a publication', + proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'oid', proargtypes => 'text', + proallargtypes => '{text,oid}', proargmodes => '{i,o}', + proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' }, { oid => '6121', descr => 'returns whether a relation can be part of a publication', proname => 'pg_relation_is_publishable', provolatile => 's', diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index fe773cf9b7d..97f26208e1d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId) */ bool puballtables; + /* + * indicates that this is special publication which should encompass all + * sequences in the database (except for the unlogged and temp ones) + */ + bool puballsequences; + /* true if inserts are published */ bool pubinsert; @@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if truncates are published */ bool pubtruncate; + /* true if sequences are published */ + bool pubsequence; + /* true if partition changes are published using root schema */ bool pubviaroot; } FormData_pg_publication; @@ -72,6 +81,7 @@ typedef struct PublicationActions bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubsequence; } PublicationActions; typedef struct PublicationDesc @@ -92,6 +102,7 @@ typedef struct Publication Oid oid; char *name; bool alltables; + bool allsequences; bool pubviaroot; PublicationActions pubactions; } Publication; @@ -122,14 +133,15 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationRelations(Oid pubid, char objectType, + PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); -extern List *GetPublicationSchemas(Oid pubid); -extern List *GetSchemaPublications(Oid schemaid); -extern List *GetSchemaPublicationRelations(Oid schemaid, +extern List *GetPublicationSchemas(Oid pubid, char objectType); +extern List *GetSchemaPublications(Oid schemaid, char objectType); +extern List *GetSchemaPublicationRelations(Oid schemaid, char objectType, PublicationPartOpt pub_partopt); -extern List *GetAllSchemaPublicationRelations(Oid puboid, +extern List *GetAllSchemaPublicationRelations(Oid puboid, char objectType, PublicationPartOpt pub_partopt); extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, @@ -137,11 +149,15 @@ extern List *GetPubPartitionOptionRelations(List *result, extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level); +extern List *GetAllSequencesPublications(void); +extern List *GetAllSequencesPublicationRelations(void); + extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, + char objectType, bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); diff --git a/src/include/catalog/pg_publication_namespace.h b/src/include/catalog/pg_publication_namespace.h index e4306da02e7..7340a1ec646 100644 --- a/src/include/catalog/pg_publication_namespace.h +++ b/src/include/catalog/pg_publication_namespace.h @@ -32,6 +32,7 @@ CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) Oid oid; /* oid */ Oid pnpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid pnnspid BKI_LOOKUP(pg_namespace); /* Oid of the schema */ + char pntype; /* object type to include */ } FormData_pg_publication_namespace; /* ---------------- @@ -42,6 +43,13 @@ CATALOG(pg_publication_namespace,8901,PublicationNamespaceRelationId) typedef FormData_pg_publication_namespace *Form_pg_publication_namespace; DECLARE_UNIQUE_INDEX_PKEY(pg_publication_namespace_oid_index, 8902, PublicationNamespaceObjectIndexId, on pg_publication_namespace using btree(oid oid_ops)); -DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 8903, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_pntype_index, 8903, PublicationNamespacePnnspidPnpubidPntypeIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops, pntype char_ops)); + +/* object type to include from a schema, maps to relkind */ +#define PUB_OBJTYPE_TABLE 't' /* table (regular or partitioned) */ +#define PUB_OBJTYPE_SEQUENCE 's' /* sequence object */ +#define PUB_OBJTYPE_UNSUPPORTED 'u' /* used for non-replicated types */ + +extern char pub_get_object_type_for_relkind(char relkind); #endif /* PG_PUBLICATION_NAMESPACE_H */ diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9fecc41954e..5bab90db8e0 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt); extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *rptr); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 2f618cb2292..cb1fcc0ee31 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3665,6 +3665,10 @@ typedef enum PublicationObjSpecType PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of * search_path */ + PUBLICATIONOBJ_SEQUENCE, /* Sequence type */ + PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA, /* Sequences in schema type */ + PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA, /* Get the first element of + * search_path */ PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */ } PublicationObjSpecType; @@ -3683,7 +3687,7 @@ typedef struct CreatePublicationStmt char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ List *pubobjects; /* Optional list of publication objects */ - bool for_all_tables; /* Special publication for all tables in db */ + List *for_all_objects; /* Special publication for all objects in db */ } CreatePublicationStmt; typedef enum AlterPublicationAction @@ -3706,7 +3710,7 @@ typedef struct AlterPublicationStmt * objects. */ List *pubobjects; /* Optional list of publication objects */ - bool for_all_tables; /* Special publication for all tables in db */ + List *for_all_objects; /* Special publication for all objects in db */ AlterPublicationAction action; /* What action to perform with the given * objects */ } AlterPublicationStmt; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 4d2c881644a..fb86ca022d2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_SEQUENCE = 'Q', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -118,6 +119,18 @@ typedef struct LogicalRepTyp char *typname; /* name of the remote type */ } LogicalRepTyp; +/* Sequence info */ +typedef struct LogicalRepSequence +{ + Oid remoteid; /* unique id of the remote sequence */ + char *nspname; /* schema name of remote sequence */ + char *seqname; /* name of the remote sequence */ + bool transactional; + int64 last_value; + int64 log_cnt; + bool is_called; +} LogicalRepSequence; + /* Transaction info */ typedef struct LogicalRepBeginData { @@ -230,6 +243,12 @@ extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); +extern void logicalrep_write_sequence(StringInfo out, Relation rel, + TransactionId xid, XLogRecPtr lsn, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); +extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a5..f4e9f35d09d 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool sequences; } PGOutputData; #endif /* PGOUTPUT_H */ |
