summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/logical.h13
-rw-r--r--src/include/replication/logicallauncher.h1
-rw-r--r--src/include/replication/snapbuild.h1
-rw-r--r--src/include/replication/walreceiver.h67
-rw-r--r--src/include/replication/walsender.h12
-rw-r--r--src/include/replication/worker_internal.h30
6 files changed, 108 insertions, 16 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fd34964bad3..d10dd2c90af 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -31,9 +31,11 @@ typedef struct LogicalDecodingContext
/* memory context this is all allocated in */
MemoryContext context;
- /* infrastructure pieces */
- XLogReaderState *reader;
+ /* The associated replication slot */
ReplicationSlot *slot;
+
+ /* infrastructure pieces for decoding */
+ XLogReaderState *reader;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
@@ -75,6 +77,7 @@ typedef struct LogicalDecodingContext
TransactionId write_xid;
} LogicalDecodingContext;
+
extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
@@ -92,6 +95,12 @@ extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
+extern LogicalDecodingContext *CreateCopyDecodingContext(
+ List *output_plugin_options,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write);
+extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx);
+
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index cfe3db10dd0..060946a0964 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -13,6 +13,7 @@
#define LOGICALLAUNCHER_H
extern int max_logical_replication_workers;
+extern int max_sync_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 5e824ae6fc8..091a9f91e36 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -59,6 +59,7 @@ extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
+extern Snapshot SnapBuildInitalSnapshot(SnapBuild *builder);
extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
extern void SnapBuildClearExportedSnapshot(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 78e577c89b1..fb55c30fa19 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -15,9 +15,12 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h"
+#include "replication/logicalproto.h"
+#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
+#include "utils/tuplestore.h"
/* user-settable parameters */
extern int wal_receiver_status_interval;
@@ -160,6 +163,33 @@ typedef struct
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
+/*
+ * Status of walreceiver query execution.
+ *
+ * We only define statuses that are currently used.
+ */
+typedef enum
+{
+ WALRCV_ERROR, /* There was error when executing the query. */
+ WALRCV_OK_COMMAND, /* Query executed utility or replication command. */
+ WALRCV_OK_TUPLES, /* Query returned tuples. */
+ WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
+ WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
+ WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication protocol. */
+} WalRcvExecStatus;
+
+/*
+ * Return value for walrcv_query, returns the status of the execution and
+ * tuples if any.
+ */
+typedef struct WalRcvExecResult
+{
+ WalRcvExecStatus status;
+ char *err;
+ Tuplestorestate *tuplestore;
+ TupleDesc tupledesc;
+} WalRcvExecResult;
+
/* libpqwalreceiver hooks */
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
const char *appname,
@@ -183,9 +213,12 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
- bool export_snapshot, XLogRecPtr *lsn);
-typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
- char **err);
+ CRSSnapshotAction snapshot_action,
+ XLogRecPtr *lsn);
+typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
+ const char *query,
+ const int nRetTypes,
+ const Oid *retTypes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
@@ -200,7 +233,7 @@ typedef struct WalReceiverFunctionsType
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
- walrcv_command_fn walrcv_command;
+ walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
@@ -224,13 +257,31 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \
- WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn)
-#define walrcv_command(conn, cmd, err) \
- WalReceiverFunctions->walrcv_command(conn, cmd, err)
+#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
+#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
+ WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
+static inline void
+walrcv_clear_result(WalRcvExecResult *walres)
+{
+ if (!walres)
+ return;
+
+ if (walres->err)
+ pfree(walres->err);
+
+ if (walres->tuplestore)
+ tuplestore_end(walres->tuplestore);
+
+ if (walres->tupledesc)
+ FreeTupleDesc(walres->tupledesc);
+
+ pfree(walres);
+}
+
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index fe23f6619fa..2ca903872e4 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,16 @@
#include "fmgr.h"
+/*
+ * What to do with a snapshot in create replication slot command.
+ */
+typedef enum
+{
+ CRS_EXPORT_SNAPSHOT,
+ CRS_NOEXPORT_SNAPSHOT,
+ CRS_USE_SNAPSHOT
+} CRSSnapshotAction;
+
/* global state */
extern bool am_walsender;
extern bool am_cascading_walsender;
@@ -28,7 +38,7 @@ extern int wal_sender_timeout;
extern bool log_replication_commands;
extern void InitWalSender(void);
-extern void exec_replication_command(const char *query_string);
+extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 8cbf2687a9c..bf96d340caa 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -33,6 +33,9 @@ typedef struct LogicalRepWorker
/* Used for initial table synchronization. */
Oid relid;
+ char relstate;
+ XLogRecPtr relstate_lsn;
+ slock_t relmutex;
/* Stats. */
XLogRecPtr last_lsn;
@@ -42,6 +45,9 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
+/* Memory context for cached variables in apply worker. */
+MemoryContext ApplyCacheContext;
+
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;
@@ -53,12 +59,26 @@ extern bool in_remote_transaction;
extern bool got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
-extern int logicalrep_worker_count(Oid subid);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
-extern void logicalrep_worker_stop(Oid subid);
-extern void logicalrep_worker_wakeup(Oid subid);
+extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+ bool only_running);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+ Oid userid, Oid relid);
+extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+
+extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
+extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+void process_syncing_tables(XLogRecPtr current_lsn);
+void invalidate_syncing_table_states(Datum arg, int cacheid,
+ uint32 hashvalue);
+
+static inline bool
+am_tablesync_worker(void)
+{
+ return OidIsValid(MyLogicalRepWorker->relid);
+}
#endif /* WORKER_INTERNAL_H */