diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/catalog/system_views.sql | 4 | ||||
| -rw-r--r-- | src/backend/replication/logical/logical.c | 12 | ||||
| -rw-r--r-- | src/backend/replication/slot.c | 10 | ||||
| -rw-r--r-- | src/backend/replication/slotfuncs.c | 14 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 6 | ||||
| -rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
| -rw-r--r-- | src/include/catalog/pg_proc.dat | 14 | ||||
| -rw-r--r-- | src/include/nodes/replnodes.h | 1 | ||||
| -rw-r--r-- | src/include/replication/slot.h | 7 | ||||
| -rw-r--r-- | src/test/regress/expected/rules.out | 5 |
10 files changed, 56 insertions, 19 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fa58afd9d7..fc94a73a54 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS L.restart_lsn, L.confirmed_flush_lsn, L.wal_status, - L.safe_wal_size + L.safe_wal_size, + L.two_phase FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot'; CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, + IN twophase boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3f6d723d09..37b75deb72 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin, startup_cb_wrapper(ctx, &ctx->options, true); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; return ctx; @@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn, startup_cb_wrapper(ctx, &ctx->options, false); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; ereport(LOG, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fb4af2ef52..75a087c2f9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel) * name: Name of the slot * db_specific: logical decoding is db specific; if the slot is going to * be used for that pass true, otherwise false. + * two_phase: Allows decoding of prepared transactions. We allow this option + * to be enabled only at the slot creation time. If we allow this option + * to be changed during decoding then it is quite possible that we skip + * prepare first time because this option was not enabled. Now next time + * during getting changes, if the two_phase option is enabled it can skip + * prepare because by that time start decoding point has been moved. So the + * user will only get commit prepared. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency) + ReplicationSlotPersistency persistency, bool two_phase) { ReplicationSlot *slot = NULL; int i; @@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, namestrcpy(&slot->data.name, name); slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; + slot->data.two_phase = two_phase; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d24bb5b0b5..9817b44113 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false); if (immediately_reserve) { @@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn, + bool temporary, bool two_phase, + XLogRecPtr restart_lsn, bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); /* * Create logical decoding context to find start point or, if we don't @@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); + bool two_phase = PG_GETARG_BOOL(3); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, + two_phase, InvalidXLogRecPtr, true); @@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 13 +#define PG_GET_REPLICATION_SLOTS_COLS 14 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = Int64GetDatum(failLSN - currlsn); } + values[i++] = BoolGetDatum(slot_contents.data.two_phase); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb3f18ed48..23baa4498a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, - cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, + false); } else { @@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * they get dropped on error as well. */ ReplicationSlotCreate(cmd->slotname, true, - cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, + cmd->two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 4cc94de224..b19975c5c8 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202102191 +#define CATALOG_VERSION_NO 202103031 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 1487710d59..3d3974f467 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10496,16 +10496,16 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', - proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', - proallargtypes => '{name,name,bool,name,pg_lsn}', - proargmodes => '{i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool', + proallargtypes => '{name,name,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..ebc43a0293 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd ReplicationKind kind; char *plugin; bool temporary; + bool two_phase; List *options; } CreateReplicationSlotCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 5c3fde20c6..1ad5e6c50d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData */ XLogRecPtr initial_consistent_point; + /* + * Allow decoding of prepared transactions? + */ + bool two_phase; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; @@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency p); + ReplicationSlotPersistency p, bool two_phase); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 10a1f34ebc..b1c9b7bdfe 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name, l.restart_lsn, l.confirmed_flush_lsn, l.wal_status, - l.safe_wal_size - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size) + l.safe_wal_size, + l.two_phase + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, |
