summaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorPeter Eisentraut2017-03-23 12:36:36 +0000
committerPeter Eisentraut2017-03-23 12:55:37 +0000
commit7c4f52409a8c7d85ed169bbbc1f6092274d03920 (patch)
treefa3dc592bb2855e5cc0a200f4c408b4c8d299be5 /src/include
parent707576b571f05ec5b89adb65964d55f3ccccbd1b (diff)
Logical replication support for initial data copy
Add functionality for a new subscription to copy the initial data in the tables and then sync with the ongoing apply process. For the copying, add a new internal COPY option to have the COPY source data provided by a callback function. The initial data copy works on the subscriber by receiving COPY data from the publisher and then providing it locally into a COPY that writes to the destination table. A WAL receiver can now execute full SQL commands. This is used here to obtain information about tables and publications. Several new options were added to CREATE and ALTER SUBSCRIPTION to control whether and when initial table syncing happens. Change pg_dump option --no-create-subscription-slots to --no-subscription-connect and use the new CREATE SUBSCRIPTION ... NOCONNECT option for that. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com> Tested-by: Erik Rijkers <er@xs4all.nl>
Diffstat (limited to 'src/include')
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/indexing.h7
-rw-r--r--src/include/catalog/pg_proc.h5
-rw-r--r--src/include/catalog/pg_subscription_rel.h78
-rw-r--r--src/include/commands/copy.h5
-rw-r--r--src/include/nodes/nodes.h1
-rw-r--r--src/include/nodes/parsenodes.h13
-rw-r--r--src/include/nodes/replnodes.h9
-rw-r--r--src/include/parser/kwlist.h1
-rw-r--r--src/include/pgstat.h4
-rw-r--r--src/include/replication/logical.h13
-rw-r--r--src/include/replication/logicallauncher.h1
-rw-r--r--src/include/replication/snapbuild.h1
-rw-r--r--src/include/replication/walreceiver.h67
-rw-r--r--src/include/replication/walsender.h12
-rw-r--r--src/include/replication/worker_internal.h30
-rw-r--r--src/include/utils/syscache.h1
17 files changed, 228 insertions, 22 deletions
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 315f155b645..d8679f5f591 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201703221
+#define CATALOG_VERSION_NO 201703231
#endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 6bce7328a28..5d4190c05eb 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -340,8 +340,8 @@ DECLARE_UNIQUE_INDEX(pg_publication_pubname_index, 6111, on pg_publication using
DECLARE_UNIQUE_INDEX(pg_publication_rel_oid_index, 6112, on pg_publication_rel using btree(oid oid_ops));
#define PublicationRelObjectIndexId 6112
-DECLARE_UNIQUE_INDEX(pg_publication_rel_map_index, 6113, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
-#define PublicationRelMapIndexId 6113
+DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
+#define PublicationRelPrrelidPrpubidIndexId 6113
DECLARE_UNIQUE_INDEX(pg_subscription_oid_index, 6114, on pg_subscription using btree(oid oid_ops));
#define SubscriptionObjectIndexId 6114
@@ -349,6 +349,9 @@ DECLARE_UNIQUE_INDEX(pg_subscription_oid_index, 6114, on pg_subscription using b
DECLARE_UNIQUE_INDEX(pg_subscription_subname_index, 6115, on pg_subscription using btree(subdbid oid_ops, subname name_ops));
#define SubscriptionNameIndexId 6115
+DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops));
+#define SubscriptionRelSrrelidSrsubidIndexId 6117
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 22635655f56..78c23e3f5d5 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2021,6 +2021,9 @@ DESCR("is a relation insertable/updatable/deletable");
DATA(insert OID = 3843 ( pg_column_is_updatable PGNSP PGUID 12 10 0 0 0 f f f f t f s s 3 0 16 "2205 21 16" _null_ _null_ _null_ _null_ _null_ pg_column_is_updatable _null_ _null_ _null_ ));
DESCR("is a column updatable");
+DATA(insert OID = 6120 ( pg_get_replica_identity_index PGNSP PGUID 12 10 0 0 0 f f f f t f s s 1 0 2205 "2205" _null_ _null_ _null_ _null_ _null_ pg_get_replica_identity_index _null_ _null_ _null_ ));
+DESCR("oid of replica identity index if any");
+
/* Deferrable unique constraint trigger */
DATA(insert OID = 1250 ( unique_key_recheck PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 2279 "" _null_ _null_ _null_ _null_ _null_ unique_key_recheck _null_ _null_ _null_ ));
DESCR("deferred UNIQUE constraint check");
@@ -2805,7 +2808,7 @@ DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");
-DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o}" "{subid,subid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
+DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
DESCR("statistics: information about subscription");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
new file mode 100644
index 00000000000..129aa99f293
--- /dev/null
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -0,0 +1,78 @@
+/* -------------------------------------------------------------------------
+ *
+ * pg_subscription_rel.h
+ * Local info about tables that come from the publisher of a
+ * subscription (pg_subscription_rel).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * -------------------------------------------------------------------------
+ */
+#ifndef PG_SUBSCRIPTION_REL_H
+#define PG_SUBSCRIPTION_REL_H
+
+#include "catalog/genbki.h"
+
+/* ----------------
+ * pg_subscription_rel definition. cpp turns this into
+ * typedef struct FormData_pg_subscription_rel
+ * ----------------
+ */
+#define SubscriptionRelRelationId 6102
+
+/* Workaround for genbki not knowing about XLogRecPtr */
+#define pg_lsn XLogRecPtr
+
+CATALOG(pg_subscription_rel,6102) BKI_WITHOUT_OIDS
+{
+ Oid srsubid; /* Oid of subscription */
+ Oid srrelid; /* Oid of relation */
+ char srsubstate; /* state of the relation in subscription */
+ pg_lsn srsublsn; /* remote lsn of the state change
+ * used for synchronization coordination */
+} FormData_pg_subscription_rel;
+
+typedef FormData_pg_subscription_rel *Form_pg_subscription_rel;
+
+/* ----------------
+ * compiler constants for pg_subscription_rel
+ * ----------------
+ */
+#define Natts_pg_subscription_rel 4
+#define Anum_pg_subscription_rel_srsubid 1
+#define Anum_pg_subscription_rel_srrelid 2
+#define Anum_pg_subscription_rel_srsubstate 3
+#define Anum_pg_subscription_rel_srsublsn 4
+
+/* ----------------
+ * substate constants
+ * ----------------
+ */
+#define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */
+#define SUBREL_STATE_DATASYNC 'd' /* data is being synchronized (sublsn NULL) */
+#define SUBREL_STATE_SYNCDONE 's' /* synchronization finished infront of apply (sublsn set) */
+#define SUBREL_STATE_READY 'r' /* ready (sublsn set) */
+
+/* These are never stored in the catalog, we only use them for IPC. */
+#define SUBREL_STATE_UNKNOWN '\0' /* unknown state */
+#define SUBREL_STATE_SYNCWAIT 'w' /* waiting for sync */
+#define SUBREL_STATE_CATCHUP 'c' /* catching up with apply */
+
+typedef struct SubscriptionRelState
+{
+ Oid relid;
+ XLogRecPtr lsn;
+ char state;
+} SubscriptionRelState;
+
+extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn);
+extern char GetSubscriptionRelState(Oid subid, Oid relid,
+ XLogRecPtr *sublsn, bool missing_ok);
+extern void RemoveSubscriptionRel(Oid subid, Oid relid);
+
+extern List *GetSubscriptionRelations(Oid subid);
+extern List *GetSubscriptionNotReadyRelations(Oid subid);
+
+#endif /* PG_SUBSCRIPTION_REL_H */
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index d63ca0f5e99..f081f2219f2 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -21,6 +21,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -28,7 +29,7 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt,
extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
- bool is_program, List *attnamelist, List *options);
+ bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid);
@@ -36,6 +37,8 @@ extern bool NextCopyFromRawFields(CopyState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
+extern uint64 CopyFrom(CopyState cstate);
+
extern DestReceiver *CreateCopyDestReceiver(void);
#endif /* COPY_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2cbd6d77b8d..9a4221a9e7b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -488,6 +488,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_SQLCmd,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a15df229a49..582e0e0ebe9 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3319,10 +3319,23 @@ typedef struct CreateSubscriptionStmt
List *options; /* List of DefElem nodes */
} CreateSubscriptionStmt;
+typedef enum AlterSubscriptionType
+{
+ ALTER_SUBSCRIPTION_OPTIONS,
+ ALTER_SUBSCRIPTION_CONNECTION,
+ ALTER_SUBSCRIPTION_PUBLICATION,
+ ALTER_SUBSCRIPTION_PUBLICATION_REFRESH,
+ ALTER_SUBSCRIPTION_REFRESH,
+ ALTER_SUBSCRIPTION_ENABLED
+} AlterSubscriptionType;
+
typedef struct AlterSubscriptionStmt
{
NodeTag type;
+ AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of of the subscription */
+ char *conninfo; /* Connection string to publisher */
+ List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
} AlterSubscriptionStmt;
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 996da3c02ea..92ada41b6d5 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -96,4 +96,13 @@ typedef struct TimeLineHistoryCmd
TimeLineID timeline;
} TimeLineHistoryCmd;
+/* ----------------------
+ * SQL commands
+ * ----------------------
+ */
+typedef struct SQLCmd
+{
+ NodeTag type;
+} SQLCmd;
+
#endif /* REPLNODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 28c4dab2586..6cd36c7fe30 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -258,6 +258,7 @@ PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
+PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD)
PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
PG_KEYWORD("nothing", NOTHING, UNRESERVED_KEYWORD)
PG_KEYWORD("notify", NOTIFY, UNRESERVED_KEYWORD)
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f2daf32e1ab..a6752429711 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -790,7 +790,9 @@ typedef enum
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
- WAIT_EVENT_SYNC_REP
+ WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_LOGICAL_SYNC_DATA,
+ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
} WaitEventIPC;
/* ----------
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fd34964bad3..d10dd2c90af 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -31,9 +31,11 @@ typedef struct LogicalDecodingContext
/* memory context this is all allocated in */
MemoryContext context;
- /* infrastructure pieces */
- XLogReaderState *reader;
+ /* The associated replication slot */
ReplicationSlot *slot;
+
+ /* infrastructure pieces for decoding */
+ XLogReaderState *reader;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
@@ -75,6 +77,7 @@ typedef struct LogicalDecodingContext
TransactionId write_xid;
} LogicalDecodingContext;
+
extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
@@ -92,6 +95,12 @@ extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
+extern LogicalDecodingContext *CreateCopyDecodingContext(
+ List *output_plugin_options,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write);
+extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx);
+
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index cfe3db10dd0..060946a0964 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -13,6 +13,7 @@
#define LOGICALLAUNCHER_H
extern int max_logical_replication_workers;
+extern int max_sync_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 5e824ae6fc8..091a9f91e36 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -59,6 +59,7 @@ extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
+extern Snapshot SnapBuildInitalSnapshot(SnapBuild *builder);
extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
extern void SnapBuildClearExportedSnapshot(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 78e577c89b1..fb55c30fa19 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -15,9 +15,12 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h"
+#include "replication/logicalproto.h"
+#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
+#include "utils/tuplestore.h"
/* user-settable parameters */
extern int wal_receiver_status_interval;
@@ -160,6 +163,33 @@ typedef struct
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
+/*
+ * Status of walreceiver query execution.
+ *
+ * We only define statuses that are currently used.
+ */
+typedef enum
+{
+ WALRCV_ERROR, /* There was error when executing the query. */
+ WALRCV_OK_COMMAND, /* Query executed utility or replication command. */
+ WALRCV_OK_TUPLES, /* Query returned tuples. */
+ WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
+ WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
+ WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication protocol. */
+} WalRcvExecStatus;
+
+/*
+ * Return value for walrcv_query, returns the status of the execution and
+ * tuples if any.
+ */
+typedef struct WalRcvExecResult
+{
+ WalRcvExecStatus status;
+ char *err;
+ Tuplestorestate *tuplestore;
+ TupleDesc tupledesc;
+} WalRcvExecResult;
+
/* libpqwalreceiver hooks */
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
const char *appname,
@@ -183,9 +213,12 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- bool export_snapshot, XLogRecPtr *lsn);
-typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
- char **err);
+ CRSSnapshotAction snapshot_action,
+ XLogRecPtr *lsn);
+typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
+ const char *query,
+ const int nRetTypes,
+ const Oid *retTypes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
@@ -200,7 +233,7 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
- walrcv_command_fn walrcv_command;
+ walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
@@ -224,13 +257,31 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
-#define walrcv_command(conn, cmd, err) \
- WalReceiverFunctions->walrcv_command(conn, cmd, err)
+#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
+#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
+ WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
+static inline void
+walrcv_clear_result(WalRcvExecResult *walres)
+{
+ if (!walres)
+ return;
+
+ if (walres->err)
+ pfree(walres->err);
+
+ if (walres->tuplestore)
+ tuplestore_end(walres->tuplestore);
+
+ if (walres->tupledesc)
+ FreeTupleDesc(walres->tupledesc);
+
+ pfree(walres);
+}
+
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index fe23f6619fa..2ca903872e4 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,16 @@
#include "fmgr.h"
+/*
+ * What to do with a snapshot in create replication slot command.
+ */
+typedef enum
+{
+ CRS_EXPORT_SNAPSHOT,
+ CRS_NOEXPORT_SNAPSHOT,
+ CRS_USE_SNAPSHOT
+} CRSSnapshotAction;
+
/* global state */
extern bool am_walsender;
extern bool am_cascading_walsender;
@@ -28,7 +38,7 @@ extern int wal_sender_timeout;
extern bool log_replication_commands;
extern void InitWalSender(void);
-extern void exec_replication_command(const char *query_string);
+extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 8cbf2687a9c..bf96d340caa 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -33,6 +33,9 @@ typedef struct LogicalRepWorker
/* Used for initial table synchronization. */
Oid relid;
+ char relstate;
+ XLogRecPtr relstate_lsn;
+ slock_t relmutex;
/* Stats. */
XLogRecPtr last_lsn;
@@ -42,6 +45,9 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
+/* Memory context for cached variables in apply worker. */
+MemoryContext ApplyCacheContext;
+
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;
@@ -53,12 +59,26 @@ extern bool in_remote_transaction;
extern bool got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
-extern int logicalrep_worker_count(Oid subid);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
-extern void logicalrep_worker_stop(Oid subid);
-extern void logicalrep_worker_wakeup(Oid subid);
+extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+ bool only_running);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+ Oid userid, Oid relid);
+extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+
+extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
+extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+void process_syncing_tables(XLogRecPtr current_lsn);
+void invalidate_syncing_table_states(Datum arg, int cacheid,
+ uint32 hashvalue);
+
+static inline bool
+am_tablesync_worker(void)
+{
+ return OidIsValid(MyLogicalRepWorker->relid);
+}
#endif /* WORKER_INTERNAL_H */
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index 66f60d271e2..b35faf81b9e 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -89,6 +89,7 @@ enum SysCacheIdentifier
STATRELATTINH,
SUBSCRIPTIONOID,
SUBSCRIPTIONNAME,
+ SUBSCRIPTIONRELMAP,
TABLESPACEOID,
TRFOID,
TRFTYPELANG,