summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
authorPeter Eisentraut2017-01-19 17:00:00 +0000
committerPeter Eisentraut2017-01-20 14:04:49 +0000
commit665d1fad99e7b11678b0d5fa24d2898424243cd6 (patch)
treeeefe3eb528f840780aef6c09939a1844dbabb30a /src/include/replication
parentba61a04bc7fefeee03416d9911eb825c4897c223 (diff)
Logical replication
- Add PUBLICATION catalogs and DDL - Add SUBSCRIPTION catalog and DDL - Define logical replication protocol and output plugin - Add logical replication workers From: Petr Jelinek <petr@2ndquadrant.com> Reviewed-by: Steve Singer <steve@ssinger.info> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Erik Rijkers <er@xs4all.nl> Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/logicallauncher.h27
-rw-r--r--src/include/replication/logicalproto.h104
-rw-r--r--src/include/replication/logicalrelation.h43
-rw-r--r--src/include/replication/logicalworker.h17
-rw-r--r--src/include/replication/pgoutput.h29
-rw-r--r--src/include/replication/walreceiver.h60
-rw-r--r--src/include/replication/worker_internal.h62
7 files changed, 331 insertions, 11 deletions
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
new file mode 100644
index 00000000000..715ac7f24c9
--- /dev/null
+++ b/src/include/replication/logicallauncher.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicallauncher.h
+ * Exports for logical replication launcher.
+ *
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicallauncher.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALLAUNCHER_H
+#define LOGICALLAUNCHER_H
+
+extern int max_logical_replication_workers;
+
+extern void ApplyLauncherRegister(void);
+extern void ApplyLauncherMain(Datum main_arg);
+
+extern Size ApplyLauncherShmemSize(void);
+extern void ApplyLauncherShmemInit(void);
+
+extern void ApplyLauncherWakeup(void);
+extern void ApplyLauncherWakeupAtCommit(void);
+extern void AtCommit_ApplyLauncher(void);
+
+#endif /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
new file mode 100644
index 00000000000..0d8153c39d9
--- /dev/null
+++ b/src/include/replication/logicalproto.h
@@ -0,0 +1,104 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalproto.h
+ * logical replication protocol
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/replication/logicalproto.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICAL_PROTO_H
+#define LOGICAL_PROTO_H
+
+#include "replication/reorderbuffer.h"
+#include "utils/rel.h"
+
+/*
+ * Protocol capabilities
+ *
+ * LOGICAL_PROTO_VERSION_NUM is our native protocol and the greatest version
+ * we can support. PGLOGICAL_PROTO_MIN_VERSION_NUM is the oldest version we
+ * have backwards compatibility for. The client requests protocol version at
+ * connect time.
+ */
+#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
+#define LOGICALREP_PROTO_VERSION_NUM 1
+
+/* Tuple coming via logical replication. */
+typedef struct LogicalRepTupleData
+{
+ char *values[MaxTupleAttributeNumber]; /* value in out function format or NULL if values is NULL */
+ bool changed[MaxTupleAttributeNumber]; /* marker for changed/unchanged values */
+} LogicalRepTupleData;
+
+typedef uint32 LogicalRepRelId;
+
+/* Relation information */
+typedef struct LogicalRepRelation
+{
+ /* Info coming from the remote side. */
+ LogicalRepRelId remoteid; /* unique id of the relation */
+ char *nspname; /* schema name */
+ char *relname; /* relation name */
+ int natts; /* number of columns */
+ char **attnames; /* column names */
+ Oid *atttyps; /* column types */
+ char replident; /* replica identity */
+ Bitmapset *attkeys; /* Bitmap of key columns */
+} LogicalRepRelation;
+
+/* Type mapping info */
+typedef struct LogicalRepTyp
+{
+ Oid remoteid; /* unique id of the type */
+ char *nspname; /* schema name */
+ char *typname; /* name of the type */
+ Oid typoid; /* local type Oid */
+} LogicalRepTyp;
+
+/* Transaction info */
+typedef struct LogicalRepBeginData
+{
+ XLogRecPtr final_lsn;
+ TimestampTz committime;
+ TransactionId xid;
+} LogicalRepBeginData;
+
+typedef struct LogicalRepCommitData
+{
+ XLogRecPtr commit_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz committime;
+} LogicalRepCommitData;
+
+extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
+extern void logicalrep_read_begin(StringInfo in,
+ LogicalRepBeginData *begin_data);
+extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern void logicalrep_read_commit(StringInfo in,
+ LogicalRepCommitData *commit_data);
+extern void logicalrep_write_origin(StringInfo out, const char *origin,
+ XLogRecPtr origin_lsn);
+extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
+extern void logicalrep_write_insert(StringInfo out, Relation rel,
+ HeapTuple newtuple);
+extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
+extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
+ HeapTuple newtuple);
+extern LogicalRepRelId logicalrep_read_update(StringInfo in,
+ bool *has_oldtuple, LogicalRepTupleData *oldtup,
+ LogicalRepTupleData *newtup);
+extern void logicalrep_write_delete(StringInfo out, Relation rel,
+ HeapTuple oldtuple);
+extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
+ LogicalRepTupleData *oldtup);
+extern void logicalrep_write_rel(StringInfo out, Relation rel);
+extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
+extern void logicalrep_write_typ(StringInfo out, Oid typoid);
+extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
+
+#endif /* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
new file mode 100644
index 00000000000..8f9f4a094d2
--- /dev/null
+++ b/src/include/replication/logicalrelation.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalrelation.h
+ * Relation definitions for logical replication relation mapping.
+ *
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicalrelation.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALRELATION_H
+#define LOGICALRELATION_H
+
+#include "replication/logicalproto.h"
+
+typedef struct LogicalRepRelMapEntry
+{
+ LogicalRepRelation remoterel; /* key is remoterel.remoteid */
+
+ /* Mapping to local relation, filled as needed. */
+ Oid localreloid; /* local relation id */
+ Relation localrel; /* relcache entry */
+ AttrNumber *attrmap; /* map of local attributes to
+ * remote ones */
+ bool updatable; /* Can apply updates/detetes? */
+
+ /* Sync state. */
+ char state;
+ XLogRecPtr statelsn;
+} LogicalRepRelMapEntry;
+
+extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
+
+extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
+ LOCKMODE lockmode);
+extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
+ LOCKMODE lockmode);
+
+extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp);
+extern Oid logicalrep_typmap_getid(Oid remoteid);
+
+#endif /* LOGICALRELATION_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
new file mode 100644
index 00000000000..93cb25f4380
--- /dev/null
+++ b/src/include/replication/logicalworker.h
@@ -0,0 +1,17 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalworker.h
+ * Exports for logical replication workers.
+ *
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicalworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALWORKER_H
+#define LOGICALWORKER_H
+
+extern void ApplyWorkerMain(Datum main_arg);
+
+#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
new file mode 100644
index 00000000000..c20451d1f29
--- /dev/null
+++ b/src/include/replication/pgoutput.h
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * pgoutput.h
+ * Logical Replication output plugin
+ *
+ * Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * pgoutput.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGOUTPUT_H
+#define PGOUTPUT_H
+
+
+typedef struct PGOutputData
+{
+ MemoryContext context; /* private memory context for transient
+ * allocations */
+
+ /* client info */
+ uint32 protocol_version;
+
+ List *publication_names;
+ List *publications;
+} PGOutputData;
+
+#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 6ab2c6f9a52..0857bdc5566 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -134,34 +134,64 @@ typedef struct
extern WalRcvData *WalRcv;
+typedef struct
+{
+ bool logical; /* True if this is logical
+ replication stream, false if
+ physical stream. */
+ char *slotname; /* Name of the replication slot
+ or NULL. */
+ XLogRecPtr startpoint; /* LSN of starting point. */
+
+ union
+ {
+ struct
+ {
+ TimeLineID startpointTLI; /* Starting timeline */
+ } physical;
+ struct
+ {
+ uint32 proto_version; /* Logical protocol version */
+ List *publication_names; /* String list of publications */
+ } logical;
+ } proto;
+} WalRcvStreamOptions;
+
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
/* libpqwalreceiver hooks */
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
- const char *appname);
+ const char *appname,
+ char **err);
+typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
- TimeLineID *primary_tli);
+ TimeLineID *primary_tli,
+ int *server_version);
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
TimeLineID tli,
char **filename,
char **content, int *size);
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
- TimeLineID tli,
- XLogRecPtr startpoint,
- const char *slotname);
+ const WalRcvStreamOptions *options);
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
TimeLineID *next_tli);
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
+typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
+ const char *slotname, bool temporary,
+ XLogRecPtr *lsn);
+typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd,
+ char **err);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
+ walrcv_check_conninfo_fn walrcv_check_conninfo;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_identify_system_fn walrcv_identify_system;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
@@ -169,27 +199,35 @@ typedef struct WalReceiverFunctionsType
walrcv_endstreaming_fn walrcv_endstreaming;
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
+ walrcv_create_slot_fn walrcv_create_slot;
+ walrcv_command_fn walrcv_command;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
-#define walrcv_connect(conninfo, logical, appname) \
- WalReceiverFunctions->walrcv_connect(conninfo, logical, appname)
+#define walrcv_connect(conninfo, logical, appname, err) \
+ WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
+#define walrcv_check_conninfo(conninfo) \
+ WalReceiverFunctions->walrcv_check_conninfo(conninfo)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
-#define walrcv_identify_system(conn, primary_tli) \
- WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_identify_system(conn, primary_tli, server_version) \
+ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
-#define walrcv_startstreaming(conn, tli, startpoint, slotname) \
- WalReceiverFunctions->walrcv_startstreaming(conn, tli, startpoint, slotname)
+#define walrcv_startstreaming(conn, options) \
+ WalReceiverFunctions->walrcv_startstreaming(conn, options)
#define walrcv_endstreaming(conn, next_tli) \
WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd) \
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
+#define walrcv_create_slot(conn, slotname, temporary, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn)
+#define walrcv_command(conn, cmd, err) \
+ WalReceiverFunctions->walrcv_command(conn, cmd, err)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
new file mode 100644
index 00000000000..cecd2b8a1ce
--- /dev/null
+++ b/src/include/replication/worker_internal.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ *
+ * worker_internal.h
+ * Internal headers shared by logical replication workers.
+ *
+ * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/worker_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WORKER_INTERNAL_H
+#define WORKER_INTERNAL_H
+
+#include "catalog/pg_subscription.h"
+#include "storage/lock.h"
+
+typedef struct LogicalRepWorker
+{
+ /* Pointer to proc array. NULL if not running. */
+ PGPROC *proc;
+
+ /* Database id to connect to. */
+ Oid dbid;
+
+ /* User to use for connection (will be same as owner of subscription). */
+ Oid userid;
+
+ /* Subscription id for the worker. */
+ Oid subid;
+
+ /* Used for initial table synchronization. */
+ Oid relid;
+
+ /* Stats. */
+ XLogRecPtr last_lsn;
+ TimestampTz last_send_time;
+ TimestampTz last_recv_time;
+ XLogRecPtr reply_lsn;
+ TimestampTz reply_time;
+} LogicalRepWorker;
+
+/* libpqreceiver connection */
+extern struct WalReceiverConn *wrconn;
+
+/* Worker and subscription objects. */
+extern Subscription *MySubscription;
+extern LogicalRepWorker *MyLogicalRepWorker;
+
+extern bool in_remote_transaction;
+extern bool got_SIGTERM;
+
+extern void logicalrep_worker_attach(int slot);
+extern LogicalRepWorker *logicalrep_worker_find(Oid subid);
+extern int logicalrep_worker_count(Oid subid);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid);
+extern void logicalrep_worker_stop(Oid subid);
+extern void logicalrep_worker_wakeup(Oid subid);
+
+extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
+
+#endif /* WORKER_INTERNAL_H */