SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
- opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
- closing a streamed block for transaction
- aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(27 rows)
+(24 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
--- /dev/null
+# Test decoding of in-progress transaction containing dml and a concurrent
+# transaction with ddl operation. The transaction containing ddl operation
+# should not get streamed as it doesn't have any changes.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+
+ -- consume DDL
+ SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS stream_test;
+ DROP TABLE IF EXISTS stream_test1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_ddl" {CREATE TABLE stream_test1(data text);}
+
+# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
+# the currently running s0_ddl and we want to test that s0_ddl should not get
+# streamed when user asked to skip-empty-xacts.
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_ddl" { CREATE TABLE stream_test(data text); }
+step "s1_begin" { BEGIN; }
+step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
+step "s1_commit" { COMMIT; }
+step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
+
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);