diff options
Diffstat (limited to 'src/include/replication')
| -rw-r--r-- | src/include/replication/logical.h | 13 | ||||
| -rw-r--r-- | src/include/replication/logicallauncher.h | 1 | ||||
| -rw-r--r-- | src/include/replication/snapbuild.h | 1 | ||||
| -rw-r--r-- | src/include/replication/walreceiver.h | 67 | ||||
| -rw-r--r-- | src/include/replication/walsender.h | 12 | ||||
| -rw-r--r-- | src/include/replication/worker_internal.h | 30 |
6 files changed, 108 insertions, 16 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index fd34964bad3..d10dd2c90af 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -31,9 +31,11 @@ typedef struct LogicalDecodingContext /* memory context this is all allocated in */ MemoryContext context; - /* infrastructure pieces */ - XLogReaderState *reader; + /* The associated replication slot */ ReplicationSlot *slot; + + /* infrastructure pieces for decoding */ + XLogReaderState *reader; struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; @@ -75,6 +77,7 @@ typedef struct LogicalDecodingContext TransactionId write_xid; } LogicalDecodingContext; + extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, @@ -92,6 +95,12 @@ extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); +extern LogicalDecodingContext *CreateCopyDecodingContext( + List *output_plugin_options, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx); + extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index cfe3db10dd0..060946a0964 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -13,6 +13,7 @@ #define LOGICALLAUNCHER_H extern int max_logical_replication_workers; +extern int max_sync_workers_per_subscription; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 5e824ae6fc8..091a9f91e36 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -59,6 +59,7 @@ extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); +extern Snapshot SnapBuildInitalSnapshot(SnapBuild *builder); extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 78e577c89b1..fb55c30fa19 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -15,9 +15,12 @@ #include "access/xlog.h" #include "access/xlogdefs.h" #include "fmgr.h" +#include "replication/logicalproto.h" +#include "replication/walsender.h" #include "storage/latch.h" #include "storage/spin.h" #include "pgtime.h" +#include "utils/tuplestore.h" /* user-settable parameters */ extern int wal_receiver_status_interval; @@ -160,6 +163,33 @@ typedef struct struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; +/* + * Status of walreceiver query execution. + * + * We only define statuses that are currently used. + */ +typedef enum +{ + WALRCV_ERROR, /* There was error when executing the query. */ + WALRCV_OK_COMMAND, /* Query executed utility or replication command. */ + WALRCV_OK_TUPLES, /* Query returned tuples. */ + WALRCV_OK_COPY_IN, /* Query started COPY FROM. */ + WALRCV_OK_COPY_OUT, /* Query started COPY TO. */ + WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication protocol. */ +} WalRcvExecStatus; + +/* + * Return value for walrcv_query, returns the status of the execution and + * tuples if any. + */ +typedef struct WalRcvExecResult +{ + WalRcvExecStatus status; + char *err; + Tuplestorestate *tuplestore; + TupleDesc tupledesc; +} WalRcvExecResult; + /* libpqwalreceiver hooks */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, const char *appname, @@ -183,9 +213,12 @@ typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, int nbytes); typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, - bool export_snapshot, XLogRecPtr *lsn); -typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, - char **err); + CRSSnapshotAction snapshot_action, + XLogRecPtr *lsn); +typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, + const char *query, + const int nRetTypes, + const Oid *retTypes); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); typedef struct WalReceiverFunctionsType @@ -200,7 +233,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; - walrcv_command_fn walrcv_command; + walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; } WalReceiverFunctionsType; @@ -224,13 +257,31 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, export_snapshot, lsn) -#define walrcv_command(conn, cmd, err) \ - WalReceiverFunctions->walrcv_command(conn, cmd, err) +#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) +#define walrcv_exec(conn, exec, nRetTypes, retTypes) \ + WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) +static inline void +walrcv_clear_result(WalRcvExecResult *walres) +{ + if (!walres) + return; + + if (walres->err) + pfree(walres->err); + + if (walres->tuplestore) + tuplestore_end(walres->tuplestore); + + if (walres->tupledesc) + FreeTupleDesc(walres->tupledesc); + + pfree(walres); +} + /* prototypes for functions in walreceiver.c */ extern void WalReceiverMain(void) pg_attribute_noreturn(); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index fe23f6619fa..2ca903872e4 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -16,6 +16,16 @@ #include "fmgr.h" +/* + * What to do with a snapshot in create replication slot command. + */ +typedef enum +{ + CRS_EXPORT_SNAPSHOT, + CRS_NOEXPORT_SNAPSHOT, + CRS_USE_SNAPSHOT +} CRSSnapshotAction; + /* global state */ extern bool am_walsender; extern bool am_cascading_walsender; @@ -28,7 +38,7 @@ extern int wal_sender_timeout; extern bool log_replication_commands; extern void InitWalSender(void); -extern void exec_replication_command(const char *query_string); +extern bool exec_replication_command(const char *query_string); extern void WalSndErrorCleanup(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8cbf2687a9c..bf96d340caa 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -33,6 +33,9 @@ typedef struct LogicalRepWorker /* Used for initial table synchronization. */ Oid relid; + char relstate; + XLogRecPtr relstate_lsn; + slock_t relmutex; /* Stats. */ XLogRecPtr last_lsn; @@ -42,6 +45,9 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* Memory context for cached variables in apply worker. */ +MemoryContext ApplyCacheContext; + /* libpqreceiver connection */ extern struct WalReceiverConn *wrconn; @@ -53,12 +59,26 @@ extern bool in_remote_transaction; extern bool got_SIGTERM; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid); -extern int logicalrep_worker_count(Oid subid); -extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid); -extern void logicalrep_worker_stop(Oid subid); -extern void logicalrep_worker_wakeup(Oid subid); +extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + bool only_running); +extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, + Oid userid, Oid relid); +extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); + +extern int logicalrep_sync_worker_count(Oid subid); extern void logicalrep_worker_sigterm(SIGNAL_ARGS); +extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +void process_syncing_tables(XLogRecPtr current_lsn); +void invalidate_syncing_table_states(Datum arg, int cacheid, + uint32 hashvalue); + +static inline bool +am_tablesync_worker(void) +{ + return OidIsValid(MyLogicalRepWorker->relid); +} #endif /* WORKER_INTERNAL_H */ |
