Skip empty transaction stream in test_decoding.
authorAmit Kapila <akapila@postgresql.org>
Fri, 11 Sep 2020 04:30:01 +0000 (10:00 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 11 Sep 2020 04:30:01 +0000 (10:00 +0530)
We were decoding empty transactions via streaming APIs added in commit
45fdc9738b even when the user used the option 'skip-empty-xacts'. The APIs
makes no effort to skip empty xacts under the assumption that we will
never try to stream such transactions. However, that is not true because
we can pick to stream a transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such messages to
downstream rather they are just to update the internal state. So, we need
to skip such xacts when plugin uses the option 'skip-empty-xacts'.

Diagnosed-By: Amit Kapila
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1+OqgFNZkf7=ETe_y5ntjgDk3T0wcdkd4Sot_u1hySGfw@mail.gmail.com

contrib/test_decoding/Makefile
contrib/test_decoding/expected/concurrent_stream.out [new file with mode: 0644]
contrib/test_decoding/expected/stream.out
contrib/test_decoding/specs/concurrent_stream.spec [new file with mode: 0644]
contrib/test_decoding/test_decoding.c

index ed9a3d6c0edeef621eac73eef591eb82ad149cf4..f23f15b04d4af77b5500d0dfeb5729e20dbacf61 100644 (file)
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
        decoding_into_rel binary prepared replorigin time messages \
        spill slot truncate stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
-       oldest_xmin snapshot_transfer subxact_without_top
+       oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
new file mode 100644 (file)
index 0000000..e731d13
--- /dev/null
@@ -0,0 +1,19 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+step s0_begin: BEGIN;
+step s0_ddl: CREATE TABLE stream_test1(data text);
+step s1_ddl: CREATE TABLE stream_test(data text);
+step s1_begin: BEGIN;
+step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s1_commit: COMMIT;
+step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+data           
+
+opening a streamed block for transaction
+streaming change for transaction
+closing a streamed block for transaction
+committing streamed transaction
+?column?       
+
+stop           
index d7e32f8185469b998ac96af2884d13262a8a9d53..e1c3bc838d5e35398b2e1002b0208b48095df312 100644 (file)
@@ -29,10 +29,7 @@ COMMIT;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
- opening a streamed block for transaction
  streaming message: transactional: 1 prefix: test, sz: 50
- closing a streamed block for transaction
- aborting streamed (sub)transaction
  opening a streamed block for transaction
  streaming change for transaction
  streaming change for transaction
@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
  streaming change for transaction
  closing a streamed block for transaction
  committing streamed transaction
-(27 rows)
+(24 rows)
 
 -- streaming test for toast changes
 ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
new file mode 100644 (file)
index 0000000..ad9fde9
--- /dev/null
@@ -0,0 +1,37 @@
+# Test decoding of in-progress transaction containing dml and a concurrent
+# transaction with ddl operation. The transaction containing ddl operation
+# should not get streamed as it doesn't have any changes.
+
+setup
+{
+  SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+
+  -- consume DDL
+  SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+  CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS stream_test;
+    DROP TABLE IF EXISTS stream_test1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_ddl"   {CREATE TABLE stream_test1(data text);}
+
+# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
+# the currently running s0_ddl and we want to test that s0_ddl should not get
+# streamed when user asked to skip-empty-xacts.
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_ddl"   { CREATE TABLE stream_test(data text); }
+step "s1_begin" { BEGIN; }
+step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
+step "s1_commit" { COMMIT; }
+step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
+
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
index 34745150e9ba42eab689feba58a05e32ac27ed7c..e60ab34a5a7cd6bfe8d1867c250d61b9c92d69d9 100644 (file)
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
                                                          Size sz, const char *message);
 static void pg_decode_stream_start(LogicalDecodingContext *ctx,
                                                                   ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+                                                                  TestDecodingData *data,
+                                                                  ReorderBufferTXN *txn,
+                                                                  bool last_write);
 static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
                                                                  ReorderBufferTXN *txn);
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
        OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_start(LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn)
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
-       OutputPluginPrepareWrite(ctx, true);
+       data->xact_wrote_changes = false;
+       if (data->skip_empty_xacts)
+               return;
+       pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+       OutputPluginPrepareWrite(ctx, last_write);
        if (data->include_xids)
                appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
        else
                appendStringInfo(ctx->out, "opening a streamed block for transaction");
-       OutputPluginWrite(ctx, true);
+       OutputPluginWrite(ctx, last_write);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_stop(LogicalDecodingContext *ctx,
                                          ReorderBufferTXN *txn)
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+               return;
+
        OutputPluginPrepareWrite(ctx, true);
        if (data->include_xids)
                appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
        OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_abort(LogicalDecodingContext *ctx,
                                           ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+               return;
+
        OutputPluginPrepareWrite(ctx, true);
        if (data->include_xids)
                appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
        OutputPluginWrite(ctx, true);
 }
 
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
 static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
                                                ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+               return;
+
        OutputPluginPrepareWrite(ctx, true);
 
        if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
+       /* output stream start if we haven't yet */
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+       {
+               pg_output_stream_start(ctx, data, txn, false);
+       }
+       data->xact_wrote_changes = true;
+
        OutputPluginPrepareWrite(ctx, true);
        if (data->include_xids)
                appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 {
        TestDecodingData *data = ctx->output_plugin_private;
 
+       if (data->skip_empty_xacts && !data->xact_wrote_changes)
+       {
+               pg_output_stream_start(ctx, data, txn, false);
+       }
+       data->xact_wrote_changes = true;
+
        OutputPluginPrepareWrite(ctx, true);
        if (data->include_xids)
                appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);