diff options
Diffstat (limited to 'src/include')
| -rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
| -rw-r--r-- | src/include/catalog/pg_proc.dat | 2 | ||||
| -rw-r--r-- | src/include/catalog/pg_subscription_rel.h | 23 | ||||
| -rw-r--r-- | src/include/commands/sequence.h | 1 | ||||
| -rw-r--r-- | src/include/replication/logicalworker.h | 3 | ||||
| -rw-r--r-- | src/include/replication/worker_internal.h | 22 |
6 files changed, 47 insertions, 6 deletions
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 18e95179ab6..593aed7fe21 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202510281 +#define CATALOG_VERSION_NO 202511051 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 9121a382f76..34b7fddb0e7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3433,7 +3433,7 @@ proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u', prorettype => 'int8', proargtypes => 'regclass', prosrc => 'pg_sequence_last_value' }, -{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump', +{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization', proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u', prorettype => 'record', proargtypes => 'regclass', proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9f88498ecd3..b1e7e75c6b8 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -82,6 +82,29 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; +/* + * Holds local sequence identity and corresponding publisher values used during + * sequence synchronization. + */ +typedef struct LogicalRepSequenceInfo +{ + /* Sequence information retrieved from the local node */ + char *seqname; + char *nspname; + Oid localrelid; + + /* Sequence information retrieved from the publisher node */ + XLogRecPtr page_lsn; + int64 last_value; + bool is_called; + + /* + * True if the sequence identified by nspname + seqname exists on the + * publisher. + */ + bool found_on_pub; +} LogicalRepSequenceInfo; + extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 9ac0b67683d..46b4d89dd6e 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); +extern void SetSequence(Oid relid, int64 next, bool is_called); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 88912606e4d..56fa79b648e 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index e23fa9a4514..f081619f151 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType { WORKERTYPE_UNKNOWN = 0, WORKERTYPE_TABLESYNC, + WORKERTYPE_SEQUENCESYNC, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; @@ -106,6 +107,8 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + TimestampTz last_seqsync_start_time; } LogicalRepWorker; /* @@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); +extern void logicalrep_reset_seqsync_start_time(void); extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, @@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn); extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn); +extern void ProcessSequencesForSync(void); pg_noreturn extern void FinishSyncWorker(void); extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue); +extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, + Oid relid, TimestampTz *last_start_time); extern void ProcessSyncingRelations(XLogRecPtr current_lsn); -extern bool FetchRelationStates(bool *started_tx); +extern void FetchRelationStates(bool *has_pending_subtables, + bool *has_pending_sequences, bool *started_tx); extern void stream_start_internal(TransactionId xid, bool first_segment); extern void stream_stop_internal(TransactionId xid); @@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); +} + +static inline bool +am_sequencesync_worker(void) +{ + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool |
