logical decoding: fix decoding of a commit's commit time.
authorAndres Freund <andres@anarazel.de>
Thu, 3 Mar 2016 07:42:21 +0000 (23:42 -0800)
committerAndres Freund <andres@anarazel.de>
Thu, 3 Mar 2016 07:42:21 +0000 (23:42 -0800)
When adding replication origins in 5aa235042, I somehow managed to set
the timestamp of decoded transactions to InvalidXLogRecptr when decoding
one made without a replication origin. Fix that, and the wrong type of
the new commit_time variable.

This didn't trigger a regression test failure because we explicitly
don't show commit timestamps in the regression tests, as they obviously
are variable. Add a test that checks that a decoded commit's timestamp
is within minutes of NOW() from before the commit.

Reported-By: Weiping Qu
Diagnosed-By: Artur Zakirov
Discussion: 56D4197E.9050706@informatik.uni-kl.de,
    56D42918.1010108@postgrespro.ru
Backpatch: 9.5, where 5aa235042 originates.

contrib/test_decoding/Makefile
contrib/test_decoding/expected/time.out [new file with mode: 0644]
contrib/test_decoding/sql/time.sql [new file with mode: 0644]
src/backend/replication/logical/decode.c

index a362e696916a093f5d1abacf616eef349d68b1c8..78816bfe2f8b3472c1337cb8925d13644c0ec4b1 100644 (file)
@@ -38,7 +38,7 @@ submake-test_decoding:
    $(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-   binary prepared replorigin
+   binary prepared replorigin time
 
 regresscheck: | submake-regress submake-test_decoding temp-install
    $(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/time.out b/contrib/test_decoding/expected/time.out
new file mode 100644 (file)
index 0000000..3b06849
--- /dev/null
@@ -0,0 +1,40 @@
+SET synchronous_commit = on;
+CREATE TABLE test_time(data text);
+-- remember the current time
+SELECT set_config('test.time_before', NOW()::text, false) IS NOT NULL;
+ ?column? 
+----------
+ t
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- a single transaction, to get the commit time
+INSERT INTO test_time(data) VALUES ('');
+-- parse the commit time from the changeset
+SELECT set_config('test.time_after', regexp_replace(data, '^COMMIT \(at (.*)\)$', '\1'), false) IS NOT NULL
+FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '1')
+WHERE data ~ 'COMMIT' LIMIT 1;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- ensure commit time is sane in relation to the previous time
+SELECT (time_after - time_before) <= '10 minutes'::interval, time_after >= time_before
+FROM (SELECT current_setting('test.time_after')::timestamptz AS time_after, (SELECT current_setting('test.time_before')::timestamptz) AS time_before) AS d;
+ ?column? | ?column? 
+----------+----------
+ t        | t
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
diff --git a/contrib/test_decoding/sql/time.sql b/contrib/test_decoding/sql/time.sql
new file mode 100644 (file)
index 0000000..a47c973
--- /dev/null
@@ -0,0 +1,22 @@
+SET synchronous_commit = on;
+
+CREATE TABLE test_time(data text);
+
+-- remember the current time
+SELECT set_config('test.time_before', NOW()::text, false) IS NOT NULL;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- a single transaction, to get the commit time
+INSERT INTO test_time(data) VALUES ('');
+
+-- parse the commit time from the changeset
+SELECT set_config('test.time_after', regexp_replace(data, '^COMMIT \(at (.*)\)$', '\1'), false) IS NOT NULL
+FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '1')
+WHERE data ~ 'COMMIT' LIMIT 1;
+
+-- ensure commit time is sane in relation to the previous time
+SELECT (time_after - time_before) <= '10 minutes'::interval, time_after >= time_before
+FROM (SELECT current_setting('test.time_after')::timestamptz AS time_after, (SELECT current_setting('test.time_before')::timestamptz) AS time_before) AS d;
+
+SELECT pg_drop_replication_slot('regression_slot');
index 88c3a4985a57041fa432f26fe5ad37074c6ef8d7..56be1ed554b15845a4c574c4c8116a5b48840f01 100644 (file)
@@ -449,7 +449,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
             xl_xact_parsed_commit *parsed, TransactionId xid)
 {
    XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
-   XLogRecPtr  commit_time = InvalidXLogRecPtr;
+   TimestampTz commit_time = parsed->xact_time;
    XLogRecPtr  origin_id = XLogRecGetOrigin(buf->record);
    int         i;