summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/decoding_into_rel.out84
-rw-r--r--contrib/test_decoding/sql/decoding_into_rel.sql27
-rw-r--r--src/backend/replication/logical/reorderbuffer.c81
4 files changed, 152 insertions, 42 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 2e5a01bd730..438be44afcc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,7 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
regresscheck: all | submake-regress submake-test_decoding
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out
new file mode 100644
index 00000000000..2671258f5d9
--- /dev/null
+++ b/contrib/test_decoding/expected/decoding_into_rel.out
@@ -0,0 +1,84 @@
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+CREATE TABLE changeresult AS
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+ data
+------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT * FROM changeresult;
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.somechange: INSERT: id[integer]:1
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.somechange: INSERT: id[integer]:1'
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(20 rows)
+
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
+ ?column?
+----------
+ stop
+(1 row)
+
diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql
new file mode 100644
index 00000000000..3704821bcc3
--- /dev/null
+++ b/contrib/test_decoding/sql/decoding_into_rel.sql
@@ -0,0 +1,27 @@
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+
+CREATE TABLE changeresult AS
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+DROP TABLE changeresult;
+DROP TABLE somechange;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ece1bc80640..7d8f40738d4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1264,8 +1264,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
volatile CommandId command_id = FirstCommandId;
volatile Snapshot snapshot_now = NULL;
- volatile bool txn_started = false;
- volatile bool subtxn_started = false;
+ volatile bool using_subtxn = false;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
@@ -1305,7 +1304,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
PG_TRY();
{
- txn_started = false;
/*
* Decoding needs access to syscaches et al., which in turn use
@@ -1317,16 +1315,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
* When we're called via the SQL SRF there's already a transaction
* started, so start an explicit subtransaction there.
*/
- if (IsTransactionOrTransactionBlock())
- {
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ if (using_subtxn)
BeginInternalSubTransaction("replay");
- subtxn_started = true;
- }
else
- {
StartTransactionCommand();
- txn_started = true;
- }
rb->begin(rb, txn);
@@ -1489,22 +1483,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
elog(ERROR, "output plugin used XID %u",
GetCurrentTransactionId());
- /* make sure there's no cache pollution */
- ReorderBufferExecuteInvalidations(rb, txn);
-
/* cleanup */
TeardownHistoricSnapshot(false);
/*
- * Abort subtransaction or the transaction as a whole has the right
+ * Aborting the current (sub-)transaction as a whole has the right
* semantics. We want all locks acquired in here to be released, not
* reassigned to the parent and we do not want any database access
* have persistent effects.
*/
- if (subtxn_started)
+ AbortCurrentTransaction();
+
+ /* make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(rb, txn);
+
+ if (using_subtxn)
RollbackAndReleaseCurrentSubTransaction();
- else if (txn_started)
- AbortCurrentTransaction();
if (snapshot_now->copied)
ReorderBufferFreeSnap(rb, snapshot_now);
@@ -1520,20 +1514,21 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
TeardownHistoricSnapshot(true);
- if (snapshot_now->copied)
- ReorderBufferFreeSnap(rb, snapshot_now);
-
- if (subtxn_started)
- RollbackAndReleaseCurrentSubTransaction();
- else if (txn_started)
- AbortCurrentTransaction();
-
/*
- * Invalidations in an aborted transactions aren't allowed to do
- * catalog access, so we don't need to still have the snapshot setup.
+ * Force cache invalidation to happen outside of a valid transaction
+ * to prevent catalog access as we just caught an error.
*/
+ AbortCurrentTransaction();
+
+ /* make sure there's no cache pollution */
ReorderBufferExecuteInvalidations(rb, txn);
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ if (snapshot_now->copied)
+ ReorderBufferFreeSnap(rb, snapshot_now);
+
/* remove potential on-disk data, and deallocate */
ReorderBufferCleanupTXN(rb, txn);
@@ -1645,20 +1640,24 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
{
- /* setup snapshot to perform the invalidations in */
- SetupHistoricSnapshot(txn->base_snapshot, txn->tuplecid_hash);
- PG_TRY();
- {
- ReorderBufferExecuteInvalidations(rb, txn);
- TeardownHistoricSnapshot(false);
- }
- PG_CATCH();
- {
- /* cleanup */
- TeardownHistoricSnapshot(true);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ bool use_subtxn = IsTransactionOrTransactionBlock();
+
+ if (use_subtxn)
+ BeginInternalSubTransaction("replay");
+
+ /*
+ * Force invalidations to happen outside of a valid transaction - that
+ * way entries will just be marked as invalid without accessing the
+ * catalog. That's advantageous because we don't need to setup the
+ * full state necessary for catalog access.
+ */
+ if (use_subtxn)
+ AbortCurrentTransaction();
+
+ ReorderBufferExecuteInvalidations(rb, txn);
+
+ if (use_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
}
else
Assert(txn->ninvalidations == 0);