diff options
| author | Peter Eisentraut | 2017-01-19 17:00:00 +0000 |
|---|---|---|
| committer | Peter Eisentraut | 2017-01-20 14:04:49 +0000 |
| commit | 665d1fad99e7b11678b0d5fa24d2898424243cd6 (patch) | |
| tree | eefe3eb528f840780aef6c09939a1844dbabb30a /src/include/replication | |
| parent | ba61a04bc7fefeee03416d9911eb825c4897c223 (diff) | |
Logical replication
- Add PUBLICATION catalogs and DDL
- Add SUBSCRIPTION catalog and DDL
- Define logical replication protocol and output plugin
- Add logical replication workers
From: Petr Jelinek <petr@2ndquadrant.com>
Reviewed-by: Steve Singer <steve@ssinger.info>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Erik Rijkers <er@xs4all.nl>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Diffstat (limited to 'src/include/replication')
| -rw-r--r-- | src/include/replication/logicallauncher.h | 27 | ||||
| -rw-r--r-- | src/include/replication/logicalproto.h | 104 | ||||
| -rw-r--r-- | src/include/replication/logicalrelation.h | 43 | ||||
| -rw-r--r-- | src/include/replication/logicalworker.h | 17 | ||||
| -rw-r--r-- | src/include/replication/pgoutput.h | 29 | ||||
| -rw-r--r-- | src/include/replication/walreceiver.h | 60 | ||||
| -rw-r--r-- | src/include/replication/worker_internal.h | 62 |
7 files changed, 331 insertions, 11 deletions
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h new file mode 100644 index 00000000000..715ac7f24c9 --- /dev/null +++ b/src/include/replication/logicallauncher.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * logicallauncher.h + * Exports for logical replication launcher. + * + * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group + * + * src/include/replication/logicallauncher.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALLAUNCHER_H +#define LOGICALLAUNCHER_H + +extern int max_logical_replication_workers; + +extern void ApplyLauncherRegister(void); +extern void ApplyLauncherMain(Datum main_arg); + +extern Size ApplyLauncherShmemSize(void); +extern void ApplyLauncherShmemInit(void); + +extern void ApplyLauncherWakeup(void); +extern void ApplyLauncherWakeupAtCommit(void); +extern void AtCommit_ApplyLauncher(void); + +#endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h new file mode 100644 index 00000000000..0d8153c39d9 --- /dev/null +++ b/src/include/replication/logicalproto.h @@ -0,0 +1,104 @@ +/*------------------------------------------------------------------------- + * + * logicalproto.h + * logical replication protocol + * + * Copyright (c) 2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/logicalproto.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_PROTO_H +#define LOGICAL_PROTO_H + +#include "replication/reorderbuffer.h" +#include "utils/rel.h" + +/* + * Protocol capabilities + * + * LOGICAL_PROTO_VERSION_NUM is our native protocol and the greatest version + * we can support. PGLOGICAL_PROTO_MIN_VERSION_NUM is the oldest version we + * have backwards compatibility for. The client requests protocol version at + * connect time. + */ +#define LOGICALREP_PROTO_MIN_VERSION_NUM 1 +#define LOGICALREP_PROTO_VERSION_NUM 1 + +/* Tuple coming via logical replication. */ +typedef struct LogicalRepTupleData +{ + char *values[MaxTupleAttributeNumber]; /* value in out function format or NULL if values is NULL */ + bool changed[MaxTupleAttributeNumber]; /* marker for changed/unchanged values */ +} LogicalRepTupleData; + +typedef uint32 LogicalRepRelId; + +/* Relation information */ +typedef struct LogicalRepRelation +{ + /* Info coming from the remote side. */ + LogicalRepRelId remoteid; /* unique id of the relation */ + char *nspname; /* schema name */ + char *relname; /* relation name */ + int natts; /* number of columns */ + char **attnames; /* column names */ + Oid *atttyps; /* column types */ + char replident; /* replica identity */ + Bitmapset *attkeys; /* Bitmap of key columns */ +} LogicalRepRelation; + +/* Type mapping info */ +typedef struct LogicalRepTyp +{ + Oid remoteid; /* unique id of the type */ + char *nspname; /* schema name */ + char *typname; /* name of the type */ + Oid typoid; /* local type Oid */ +} LogicalRepTyp; + +/* Transaction info */ +typedef struct LogicalRepBeginData +{ + XLogRecPtr final_lsn; + TimestampTz committime; + TransactionId xid; +} LogicalRepBeginData; + +typedef struct LogicalRepCommitData +{ + XLogRecPtr commit_lsn; + XLogRecPtr end_lsn; + TimestampTz committime; +} LogicalRepCommitData; + +extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin(StringInfo in, + LogicalRepBeginData *begin_data); +extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit(StringInfo in, + LogicalRepCommitData *commit_data); +extern void logicalrep_write_origin(StringInfo out, const char *origin, + XLogRecPtr origin_lsn); +extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); +extern void logicalrep_write_insert(StringInfo out, Relation rel, + HeapTuple newtuple); +extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); +extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, + HeapTuple newtuple); +extern LogicalRepRelId logicalrep_read_update(StringInfo in, + bool *has_oldtuple, LogicalRepTupleData *oldtup, + LogicalRepTupleData *newtup); +extern void logicalrep_write_delete(StringInfo out, Relation rel, + HeapTuple oldtuple); +extern LogicalRepRelId logicalrep_read_delete(StringInfo in, + LogicalRepTupleData *oldtup); +extern void logicalrep_write_rel(StringInfo out, Relation rel); +extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); +extern void logicalrep_write_typ(StringInfo out, Oid typoid); +extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); + +#endif /* LOGICALREP_PROTO_H */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h new file mode 100644 index 00000000000..8f9f4a094d2 --- /dev/null +++ b/src/include/replication/logicalrelation.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * logicalrelation.h + * Relation definitions for logical replication relation mapping. + * + * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group + * + * src/include/replication/logicalrelation.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALRELATION_H +#define LOGICALRELATION_H + +#include "replication/logicalproto.h" + +typedef struct LogicalRepRelMapEntry +{ + LogicalRepRelation remoterel; /* key is remoterel.remoteid */ + + /* Mapping to local relation, filled as needed. */ + Oid localreloid; /* local relation id */ + Relation localrel; /* relcache entry */ + AttrNumber *attrmap; /* map of local attributes to + * remote ones */ + bool updatable; /* Can apply updates/detetes? */ + + /* Sync state. */ + char state; + XLogRecPtr statelsn; +} LogicalRepRelMapEntry; + +extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); + +extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, + LOCKMODE lockmode); +extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, + LOCKMODE lockmode); + +extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); +extern Oid logicalrep_typmap_getid(Oid remoteid); + +#endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h new file mode 100644 index 00000000000..93cb25f4380 --- /dev/null +++ b/src/include/replication/logicalworker.h @@ -0,0 +1,17 @@ +/*------------------------------------------------------------------------- + * + * logicalworker.h + * Exports for logical replication workers. + * + * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group + * + * src/include/replication/logicalworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALWORKER_H +#define LOGICALWORKER_H + +extern void ApplyWorkerMain(Datum main_arg); + +#endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h new file mode 100644 index 00000000000..c20451d1f29 --- /dev/null +++ b/src/include/replication/pgoutput.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * pgoutput.h + * Logical Replication output plugin + * + * Copyright (c) 2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * pgoutput.h + * + *------------------------------------------------------------------------- + */ +#ifndef PGOUTPUT_H +#define PGOUTPUT_H + + +typedef struct PGOutputData +{ + MemoryContext context; /* private memory context for transient + * allocations */ + + /* client info */ + uint32 protocol_version; + + List *publication_names; + List *publications; +} PGOutputData; + +#endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 6ab2c6f9a52..0857bdc5566 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -134,34 +134,64 @@ typedef struct extern WalRcvData *WalRcv; +typedef struct +{ + bool logical; /* True if this is logical + replication stream, false if + physical stream. */ + char *slotname; /* Name of the replication slot + or NULL. */ + XLogRecPtr startpoint; /* LSN of starting point. */ + + union + { + struct + { + TimeLineID startpointTLI; /* Starting timeline */ + } physical; + struct + { + uint32 proto_version; /* Logical protocol version */ + List *publication_names; /* String list of publications */ + } logical; + } proto; +} WalRcvStreamOptions; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; /* libpqwalreceiver hooks */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical, - const char *appname); + const char *appname, + char **err); +typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + int *server_version); typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *size); typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, - TimeLineID tli, - XLogRecPtr startpoint, - const char *slotname); + const WalRcvStreamOptions *options); typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, TimeLineID *next_tli); typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer, pgsocket *wait_fd); typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer, int nbytes); +typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, + const char *slotname, bool temporary, + XLogRecPtr *lsn); +typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, + char **err); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; + walrcv_check_conninfo_fn walrcv_check_conninfo; walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_identify_system_fn walrcv_identify_system; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; @@ -169,27 +199,35 @@ typedef struct WalReceiverFunctionsType walrcv_endstreaming_fn walrcv_endstreaming; walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; + walrcv_create_slot_fn walrcv_create_slot; + walrcv_command_fn walrcv_command; walrcv_disconnect_fn walrcv_disconnect; } WalReceiverFunctionsType; extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, appname) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, appname) +#define walrcv_connect(conninfo, logical, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) +#define walrcv_check_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_check_conninfo(conninfo) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) -#define walrcv_identify_system(conn, primary_tli) \ - WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_identify_system(conn, primary_tli, server_version) \ + WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) -#define walrcv_startstreaming(conn, tli, startpoint, slotname) \ - WalReceiverFunctions->walrcv_startstreaming(conn, tli, startpoint, slotname) +#define walrcv_startstreaming(conn, options) \ + WalReceiverFunctions->walrcv_startstreaming(conn, options) #define walrcv_endstreaming(conn, next_tli) \ WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) #define walrcv_receive(conn, buffer, wait_fd) \ WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) +#define walrcv_create_slot(conn, slotname, temporary, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn) +#define walrcv_command(conn, cmd, err) \ + WalReceiverFunctions->walrcv_command(conn, cmd, err) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h new file mode 100644 index 00000000000..cecd2b8a1ce --- /dev/null +++ b/src/include/replication/worker_internal.h @@ -0,0 +1,62 @@ +/*------------------------------------------------------------------------- + * + * worker_internal.h + * Internal headers shared by logical replication workers. + * + * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group + * + * src/include/replication/worker_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef WORKER_INTERNAL_H +#define WORKER_INTERNAL_H + +#include "catalog/pg_subscription.h" +#include "storage/lock.h" + +typedef struct LogicalRepWorker +{ + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* Database id to connect to. */ + Oid dbid; + + /* User to use for connection (will be same as owner of subscription). */ + Oid userid; + + /* Subscription id for the worker. */ + Oid subid; + + /* Used for initial table synchronization. */ + Oid relid; + + /* Stats. */ + XLogRecPtr last_lsn; + TimestampTz last_send_time; + TimestampTz last_recv_time; + XLogRecPtr reply_lsn; + TimestampTz reply_time; +} LogicalRepWorker; + +/* libpqreceiver connection */ +extern struct WalReceiverConn *wrconn; + +/* Worker and subscription objects. */ +extern Subscription *MySubscription; +extern LogicalRepWorker *MyLogicalRepWorker; + +extern bool in_remote_transaction; +extern bool got_SIGTERM; + +extern void logicalrep_worker_attach(int slot); +extern LogicalRepWorker *logicalrep_worker_find(Oid subid); +extern int logicalrep_worker_count(Oid subid); +extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid); +extern void logicalrep_worker_stop(Oid subid); +extern void logicalrep_worker_wakeup(Oid subid); + +extern void logicalrep_worker_sigterm(SIGNAL_ARGS); + +#endif /* WORKER_INTERNAL_H */ |
