summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat2
-rw-r--r--src/include/catalog/pg_subscription_rel.h23
-rw-r--r--src/include/commands/sequence.h1
-rw-r--r--src/include/replication/logicalworker.h3
-rw-r--r--src/include/replication/worker_internal.h22
6 files changed, 47 insertions, 6 deletions
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 18e95179ab6..593aed7fe21 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202510281
+#define CATALOG_VERSION_NO 202511051
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..34b7fddb0e7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -3433,7 +3433,7 @@
proname => 'pg_sequence_last_value', provolatile => 'v', proparallel => 'u',
prorettype => 'int8', proargtypes => 'regclass',
prosrc => 'pg_sequence_last_value' },
-{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump',
+{ oid => '6427', descr => 'return sequence tuple, for use by pg_dump and sequence synchronization',
proname => 'pg_get_sequence_data', provolatile => 'v', proparallel => 'u',
prorettype => 'record', proargtypes => 'regclass',
proallargtypes => '{regclass,int8,bool,pg_lsn}', proargmodes => '{i,o,o,o}',
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f88498ecd3..b1e7e75c6b8 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -82,6 +82,29 @@ typedef struct SubscriptionRelState
char state;
} SubscriptionRelState;
+/*
+ * Holds local sequence identity and corresponding publisher values used during
+ * sequence synchronization.
+ */
+typedef struct LogicalRepSequenceInfo
+{
+ /* Sequence information retrieved from the local node */
+ char *seqname;
+ char *nspname;
+ Oid localrelid;
+
+ /* Sequence information retrieved from the publisher node */
+ XLogRecPtr page_lsn;
+ int64 last_value;
+ bool is_called;
+
+ /*
+ * True if the sequence identified by nspname + seqname exists on the
+ * publisher.
+ */
+ bool found_on_pub;
+} LogicalRepSequenceInfo;
+
extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool retain_lock);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 9ac0b67683d..46b4d89dd6e 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void SetSequence(Oid relid, int64 next, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *record);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 88912606e4d..56fa79b648e 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -18,7 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
-extern void TablesyncWorkerMain(Datum main_arg);
+extern void TableSyncWorkerMain(Datum main_arg);
+extern void SequenceSyncWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index e23fa9a4514..f081619f151 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -30,6 +30,7 @@ typedef enum LogicalRepWorkerType
{
WORKERTYPE_UNKNOWN = 0,
WORKERTYPE_TABLESYNC,
+ WORKERTYPE_SEQUENCESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
} LogicalRepWorkerType;
@@ -106,6 +107,8 @@ typedef struct LogicalRepWorker
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
+
+ TimestampTz last_seqsync_start_time;
} LogicalRepWorker;
/*
@@ -271,6 +274,7 @@ extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_reset_seqsync_start_time(void);
extern int logicalrep_sync_worker_count(Oid subid);
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
@@ -282,11 +286,15 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+extern void ProcessSequencesForSync(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
+ Oid relid, TimestampTz *last_start_time);
extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
-extern bool FetchRelationStates(bool *started_tx);
+extern void FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_sequences, bool *started_tx);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
@@ -353,13 +361,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->in_use && \
+#define isTableSyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+#define isSequenceSyncWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_SEQUENCESYNC)
static inline bool
am_tablesync_worker(void)
{
- return isTablesyncWorker(MyLogicalRepWorker);
+ return isTableSyncWorker(MyLogicalRepWorker);
+}
+
+static inline bool
+am_sequencesync_worker(void)
+{
+ return isSequenceSyncWorker(MyLogicalRepWorker);
}
static inline bool