summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql4
-rw-r--r--src/backend/replication/logical/logical.c12
-rw-r--r--src/backend/replication/slot.c10
-rw-r--r--src/backend/replication/slotfuncs.c14
-rw-r--r--src/backend/replication/walsender.c6
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat14
-rw-r--r--src/include/nodes/replnodes.h1
-rw-r--r--src/include/replication/slot.h7
-rw-r--r--src/test/regress/expected/rules.out5
10 files changed, 56 insertions, 19 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd9d7..fc94a73a54 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS
L.restart_lsn,
L.confirmed_flush_lsn,
L.wal_status,
- L.safe_wal_size
+ L.safe_wal_size,
+ L.two_phase
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
@@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot';
CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN slot_name name, IN plugin name,
IN temporary boolean DEFAULT false,
+ IN twophase boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3f6d723d09..37b75deb72 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin,
startup_cb_wrapper(ctx, &ctx->options, true);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
return ctx;
@@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn,
startup_cb_wrapper(ctx, &ctx->options, false);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
ereport(LOG,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fb4af2ef52..75a087c2f9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel)
* name: Name of the slot
* db_specific: logical decoding is db specific; if the slot is going to
* be used for that pass true, otherwise false.
+ * two_phase: Allows decoding of prepared transactions. We allow this option
+ * to be enabled only at the slot creation time. If we allow this option
+ * to be changed during decoding then it is quite possible that we skip
+ * prepare first time because this option was not enabled. Now next time
+ * during getting changes, if the two_phase option is enabled it can skip
+ * prepare because by that time start decoding point has been moved. So the
+ * user will only get commit prepared.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency persistency)
+ ReplicationSlotPersistency persistency, bool two_phase)
{
ReplicationSlot *slot = NULL;
int i;
@@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
namestrcpy(&slot->data.name, name);
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
slot->data.persistency = persistency;
+ slot->data.two_phase = two_phase;
/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d24bb5b0b5..9817b44113 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
if (immediately_reserve)
{
@@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
*/
static void
create_logical_replication_slot(char *name, char *plugin,
- bool temporary, XLogRecPtr restart_lsn,
+ bool temporary, bool two_phase,
+ XLogRecPtr restart_lsn,
bool find_startpoint)
{
LogicalDecodingContext *ctx = NULL;
@@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin,
* error as well.
*/
ReplicationSlotCreate(name, true,
- temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+ temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
/*
* Create logical decoding context to find start point or, if we don't
@@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
+ bool two_phase = PG_GETARG_BOOL(3);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
temporary,
+ two_phase,
InvalidXLogRecPtr,
true);
@@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 13
+#define PG_GET_REPLICATION_SLOTS_COLS 14
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = Int64GetDatum(failLSN - currlsn);
}
+ values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
+ false,
src_restart_lsn,
false);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index eb3f18ed48..23baa4498a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
ReplicationSlotCreate(cmd->slotname, false,
- cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
+ false);
}
else
{
@@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
* they get dropped on error as well.
*/
ReplicationSlotCreate(cmd->slotname, true,
- cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+ cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
+ cmd->two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4cc94de224..b19975c5c8 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202102191
+#define CATALOG_VERSION_NO 202103031
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1487710d59..3d3974f467 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10496,16 +10496,16 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
- proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
- proallargtypes => '{name,name,bool,name,pg_lsn}',
- proargmodes => '{i,i,i,o,o}',
- proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool',
+ proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,o,o}',
+ proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222',
descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f2..ebc43a0293 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
ReplicationKind kind;
char *plugin;
bool temporary;
+ bool two_phase;
List *options;
} CreateReplicationSlotCmd;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 5c3fde20c6..1ad5e6c50d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData
*/
XLogRecPtr initial_consistent_point;
+ /*
+ * Allow decoding of prepared transactions?
+ */
+ bool two_phase;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
@@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency p);
+ ReplicationSlotPersistency p, bool two_phase);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 10a1f34ebc..b1c9b7bdfe 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name,
l.restart_lsn,
l.confirmed_flush_lsn,
l.wal_status,
- l.safe_wal_size
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
+ l.safe_wal_size,
+ l.two_phase
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,