summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/indexing.h7
-rw-r--r--src/include/catalog/pg_proc.h5
-rw-r--r--src/include/catalog/pg_subscription_rel.h78
-rw-r--r--src/include/commands/copy.h5
-rw-r--r--src/include/nodes/nodes.h1
-rw-r--r--src/include/nodes/parsenodes.h13
-rw-r--r--src/include/nodes/replnodes.h9
-rw-r--r--src/include/parser/kwlist.h1
-rw-r--r--src/include/pgstat.h4
-rw-r--r--src/include/replication/logical.h13
-rw-r--r--src/include/replication/logicallauncher.h1
-rw-r--r--src/include/replication/snapbuild.h1
-rw-r--r--src/include/replication/walreceiver.h67
-rw-r--r--src/include/replication/walsender.h12
-rw-r--r--src/include/replication/worker_internal.h30
-rw-r--r--src/include/utils/syscache.h1
17 files changed, 228 insertions, 22 deletions
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 315f155b645..d8679f5f591 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201703221
+#define CATALOG_VERSION_NO 201703231
#endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 6bce7328a28..5d4190c05eb 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -340,8 +340,8 @@ DECLARE_UNIQUE_INDEX(pg_publication_pubname_index, 6111, on pg_publication using
DECLARE_UNIQUE_INDEX(pg_publication_rel_oid_index, 6112, on pg_publication_rel using btree(oid oid_ops));
#define PublicationRelObjectIndexId 6112
-DECLARE_UNIQUE_INDEX(pg_publication_rel_map_index, 6113, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
-#define PublicationRelMapIndexId 6113
+DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
+#define PublicationRelPrrelidPrpubidIndexId 6113
DECLARE_UNIQUE_INDEX(pg_subscription_oid_index, 6114, on pg_subscription using btree(oid oid_ops));
#define SubscriptionObjectIndexId 6114
@@ -349,6 +349,9 @@ DECLARE_UNIQUE_INDEX(pg_subscription_oid_index, 6114, on pg_subscription using b
DECLARE_UNIQUE_INDEX(pg_subscription_subname_index, 6115, on pg_subscription using btree(subdbid oid_ops, subname name_ops));
#define SubscriptionNameIndexId 6115
+DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
+#define SubscriptionRelSrrelidSrsubidIndexId 6117
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 22635655f56..78c23e3f5d5 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2021,6 +2021,9 @@ DESCR("is a relation insertable/updatable/deletable");
DATA(insert OID = 3843 ( pg_column_is_updatable PGNSP PGUID 12 10 0 0 0 f f f f t f s s 3 0 16 "2205 21 16" _null_ _null_ _null_ _null_ _null_ pg_column_is_updatable _null_ _null_ _null_ ));
DESCR("is a column updatable");
+DATA(insert OID = 6120 ( pg_get_replica_identity_index PGNSP PGUID 12 10 0 0 0 f f f f t f s s 1 0 2205 "2205" _null_ _null_ _null_ _null_ _null_ pg_get_replica_identity_index _null_ _null_ _null_ ));
+DESCR("oid of replica identity index if any");
+
/* Deferrable unique constraint trigger */
DATA(insert OID = 1250 ( unique_key_recheck PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 2279 "" _null_ _null_ _null_ _null_ _null_ unique_key_recheck _null_ _null_ _null_ ));
DESCR("deferred UNIQUE constraint check");
@@ -2805,7 +2808,7 @@ DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");
-DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o}" "{subid,subid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
+DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
DESCR("statistics: information about subscription");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
new file mode 100644
index 00000000000..129aa99f293
--- /dev/null
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -0,0 +1,78 @@
+/* -------------------------------------------------------------------------
+ *
+ * pg_subscription_rel.h
+ * Local info about tables that come from the publisher of a
+ * subscription (pg_subscription_rel).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * -------------------------------------------------------------------------
+ */
+#ifndef PG_SUBSCRIPTION_REL_H
+#define PG_SUBSCRIPTION_REL_H
+
+#include "catalog/genbki.h"
+
+/* ----------------
+ * pg_subscription_rel definition. cpp turns this into
+ * typedef struct FormData_pg_subscription_rel
+ * ----------------
+ */
+#define SubscriptionRelRelationId 6102
+
+/* Workaround for genbki not knowing about XLogRecPtr */
+#define pg_lsn XLogRecPtr
+
+CATALOG(pg_subscription_rel,6102) BKI_WITHOUT_OIDS
+{
+ Oid srsubid; /* Oid of subscription */
+ Oid srrelid; /* Oid of relation */
+ char srsubstate; /* state of the relation in subscription */
+ pg_lsn srsublsn; /* remote lsn of the state change
+ * used for synchronization coordination */
+} FormData_pg_subscription_rel;
+
+typedef FormData_pg_subscription_rel *Form_pg_subscription_rel;
+
+/* ----------------
+ * compiler constants for pg_subscription_rel
+ * ----------------
+ */
+#define Natts_pg_subscription_rel 4
+#define Anum_pg_subscription_rel_srsubid 1
+#define Anum_pg_subscription_rel_srrelid 2
+#define Anum_pg_subscription_rel_srsubstate 3
+#define Anum_pg_subscription_rel_srsublsn 4
+
+/* ----------------
+ * substate constants
+ * ----------------
+ */
+#define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */
+#define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn NULL) */
+#define SUBREL_STATE_SYNCDONE 's' /* synchronization finished infront of apply (sublsn set) */
+#define SUBREL_STATE_READY 'r' /* ready (sublsn set) */
+
+/* These are never stored in the catalog, we only use them for IPC. */
+#define SUBREL_STATE_UNKNOWN '\0' /* unknown state */
+#define SUBREL_STATE_SYNCWAIT 'w' /* waiting for sync */
+#define SUBREL_STATE_CATCHUP 'c' /* catching up with apply */
+
+typedef struct SubscriptionRelState
+{
+ Oid relid;
+ XLogRecPtr lsn;
+ char state;
+} SubscriptionRelState;
+
+extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern char GetSubscriptionRelState(Oid subid, Oid relid,
+ XLogRecPtr *sublsn, bool missing_ok);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+
+extern List *GetSubscriptionRelations(Oid subid);
+extern List *GetSubscriptionNotReadyRelations(Oid subid);
+
+#endif /* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index d63ca0f5e99..f081f2219f2 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -21,6 +21,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -28,7 +29,7 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt,
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
- bool is_program, List *attnamelist, List *options);
+ bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid);
@@ -36,6 +37,8 @@ extern bool NextCopyFromRawFields(CopyState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
+extern uint64 CopyFrom(CopyState cstate);
+
extern DestReceiver *CreateCopyDestReceiver(void);
#endif /* COPY_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2cbd6d77b8d..9a4221a9e7b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_SQLCmd,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a15df229a49..582e0e0ebe9 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3319,10 +3319,23 @@ typedef struct CreateSubscriptionStmt
List *options; /* List of DefElem nodes */
} CreateSubscriptionStmt;
+typedef enum AlterSubscriptionType
+{
+ ALTER_SUBSCRIPTION_OPTIONS,
+ ALTER_SUBSCRIPTION_CONNECTION,
+ ALTER_SUBSCRIPTION_PUBLICATION,
+ ALTER_SUBSCRIPTION_PUBLICATION_REFRESH,
+ ALTER_SUBSCRIPTION_REFRESH,
+ ALTER_SUBSCRIPTION_ENABLED
+} AlterSubscriptionType;
+
typedef struct AlterSubscriptionStmt
{
NodeTag type;
+ AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of of the subscription */
+ char *conninfo; /* Connection string to publisher */
+ List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
} AlterSubscriptionStmt;
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 996da3c02ea..92ada41b6d5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -96,4 +96,13 @@ typedef struct TimeLineHistoryCmd
TimeLineID timeline;
} TimeLineHistoryCmd;
+/* ----------------------
+ * SQL commands
+ * ----------------------
+ */
+typedef struct SQLCmd
+{
+ NodeTag type;
+} SQLCmd;
+
#endif /* REPLNODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 28c4dab2586..6cd36c7fe30 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -258,6 +258,7 @@ PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
+PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD)
PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
PG_KEYWORD("nothing", NOTHING, UNRESERVED_KEYWORD)
PG_KEYWORD("notify", NOTIFY, UNRESERVED_KEYWORD)
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f2daf32e1ab..a6752429711 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -790,7 +790,9 @@ typedef enum
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
- WAIT_EVENT_SYNC_REP
+ WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_LOGICAL_SYNC_DATA,
+ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
} WaitEventIPC;
/* ----------
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fd34964bad3..d10dd2c90af 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -31,9 +31,11 @@ typedef struct LogicalDecodingContext
/* memory context this is all allocated in */
MemoryContext context;
- /* infrastructure pieces */
- XLogReaderState *reader;
+ /* The associated replication slot */
ReplicationSlot *slot;
+
+ /* infrastructure pieces for decoding */
+ XLogReaderState *reader;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
@@ -75,6 +77,7 @@ typedef struct LogicalDecodingContext
TransactionId write_xid;
} LogicalDecodingContext;
+
extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
@@ -92,6 +95,12 @@ extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
+extern LogicalDecodingContext *CreateCopyDecodingContext(
+ List *output_plugin_options,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write);
+extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx);
+
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index cfe3db10dd0..060946a0964 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -13,6 +13,7 @@
#define LOGICALLAUNCHER_H
extern int max_logical_replication_workers;
+extern int max_sync_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 5e824ae6fc8..091a9f91e36 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -59,6 +59,7 @@ extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
+extern Snapshot SnapBuildInitalSnapshot(SnapBuild *builder);
extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
extern void SnapBuildClearExportedSnapshot(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 78e577c89b1..fb55c30fa19 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -15,9 +15,12 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h"
+#include "replication/logicalproto.h"
+#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
+#include "utils/tuplestore.h"
/* user-settable parameters */
extern int wal_receiver_status_interval;
@@ -160,6 +163,33 @@ typedef struct
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
+/*
+ * Status of walreceiver query execution.
+ *
+ * We only define statuses that are currently used.
+ */
+typedef enum
+{
+ WALRCV_ERROR, /* There was error when executing the query. */
+ WALRCV_OK_COMMAND, /* Query executed utility or replication command. */
+ WALRCV_OK_TUPLES, /* Query returned tuples. */
+ WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
+ WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
+ WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication protocol. */
+} WalRcvExecStatus;
+
+/*
+ * Return value for walrcv_query, returns the status of the execution and
+ * tuples if any.
+ */
+typedef struct WalRcvExecResult
+{
+ WalRcvExecStatus status;
+ char *err;
+ Tuplestorestate *tuplestore;
+ TupleDesc tupledesc;
+} WalRcvExecResult;
+
/* libpqwalreceiver hooks */
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
const char *appname,
@@ -183,9 +213,12 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- bool export_snapshot, XLogRecPtr *lsn);
-typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
- char **err);
+ CRSSnapshotAction snapshot_action,
+ XLogRecPtr *lsn);
+typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
+ const char *query,
+ const int nRetTypes,
+ const Oid *retTypes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
@@ -200,7 +233,7 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
- walrcv_command_fn walrcv_command;
+ walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
@@ -224,13 +257,31 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
-#define walrcv_command(conn, cmd, err) \
- WalReceiverFunctions->walrcv_command(conn, cmd, err)
+#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
+#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
+ WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
+static inline void
+walrcv_clear_result(WalRcvExecResult *walres)
+{
+ if (!walres)
+ return;
+
+ if (walres->err)
+ pfree(walres->err);
+
+ if (walres->tuplestore)
+ tuplestore_end(walres->tuplestore);
+
+ if (walres->tupledesc)
+ FreeTupleDesc(walres->tupledesc);
+
+ pfree(walres);
+}
+
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index fe23f6619fa..2ca903872e4 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,16 @@
#include "fmgr.h"
+/*
+ * What to do with a snapshot in create replication slot command.
+ */
+typedef enum
+{
+ CRS_EXPORT_SNAPSHOT,
+ CRS_NOEXPORT_SNAPSHOT,
+ CRS_USE_SNAPSHOT
+} CRSSnapshotAction;
+
/* global state */
extern bool am_walsender;
extern bool am_cascading_walsender;
@@ -28,7 +38,7 @@ extern int wal_sender_timeout;
extern bool log_replication_commands;
extern void InitWalSender(void);
-extern void exec_replication_command(const char *query_string);
+extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 8cbf2687a9c..bf96d340caa 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -33,6 +33,9 @@ typedef struct LogicalRepWorker
/* Used for initial table synchronization. */
Oid relid;
+ char relstate;
+ XLogRecPtr relstate_lsn;
+ slock_t relmutex;
/* Stats. */
XLogRecPtr last_lsn;
@@ -42,6 +45,9 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
+/* Memory context for cached variables in apply worker. */
+MemoryContext ApplyCacheContext;
+
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;
@@ -53,12 +59,26 @@ extern bool in_remote_transaction;
extern bool got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
-extern int logicalrep_worker_count(Oid subid);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
-extern void logicalrep_worker_stop(Oid subid);
-extern void logicalrep_worker_wakeup(Oid subid);
+extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+ bool only_running);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+ Oid userid, Oid relid);
+extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+
+extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
+extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+void process_syncing_tables(XLogRecPtr current_lsn);
+void invalidate_syncing_table_states(Datum arg, int cacheid,
+ uint32 hashvalue);
+
+static inline bool
+am_tablesync_worker(void)
+{
+ return OidIsValid(MyLogicalRepWorker->relid);
+}
#endif /* WORKER_INTERNAL_H */
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 66f60d271e2..b35faf81b9e 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -89,6 +89,7 @@ enum SysCacheIdentifier
STATRELATTINH,
SUBSCRIPTIONOID,
SUBSCRIPTIONNAME,
+ SUBSCRIPTIONRELMAP,
TABLESPACEOID,
TRFOID,
TRFTYPELANG,