summaryrefslogtreecommitdiff
path: root/contrib/test_decoding
diff options
context:
space:
mode:
authorAndres Freund2015-04-29 17:30:53 +0000
committerAndres Freund2015-04-29 17:30:53 +0000
commit5aa2350426c4fdb3d04568b65aadac397012bbcb (patch)
tree954c3123dc58905bbda6407565383c65850204e7 /contrib/test_decoding
parentc6e96a2f986e4dad72c14b14d4cc17d02b2a6aad (diff)
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile3
-rw-r--r--contrib/test_decoding/expected/replorigin.out141
-rw-r--r--contrib/test_decoding/sql/replorigin.sql64
-rw-r--r--contrib/test_decoding/test_decoding.c28
4 files changed, 235 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 613e9c387b7..656eabfa005 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,8 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
+ binary prepared replorigin
regresscheck: all | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
new file mode 100644
index 00000000000..c0f512579ca
--- /dev/null
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -0,0 +1,141 @@
+-- predictability
+SET synchronous_commit = on;
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
+DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+ pg_replication_origin_create
+------------------------------
+ 2
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ERROR: cache lookup failed for replication origin 'test_decoding: temp'
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
+ table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
+ table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
+ COMMIT
+(5 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ERROR: cannot setup replication origin when one is already setup
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_progress(true);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+ local_id | external_id | remote_lsn | ?column?
+----------+--------------------------------+------------+----------
+ 1 | test_decoding: regression_slot | 0/AABBCCDD | t
+(1 row)
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+ERROR: no replication origin is configured
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+------
+(0 rows)
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+--------------------------------------------------------------------------------
+ BEGIN
+ table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
+ COMMIT
+(3 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
new file mode 100644
index 00000000000..e12404e106e
--- /dev/null
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -0,0 +1,64 @@
+-- predictability
+SET synchronous_commit = on;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+SELECT pg_replication_origin_session_progress(true);
+
+SELECT pg_replication_origin_session_reset();
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 963d5df9dae..bca03ee21b4 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
#include "replication/output_plugin.h"
#include "replication/logical.h"
+#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
+ bool only_local;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static bool pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
void
_PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
+ cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
}
@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_xids = true;
data->include_timestamp = false;
data->skip_empty_xacts = false;
+ data->only_local = false;
ctx->output_plugin_private = data;
@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "only-local") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->only_local = true;
+ else if (!parse_bool(strVal(elem->arg), &data->only_local))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static bool
+pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->only_local && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
+
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.