Include replication origins in SQL functions for commit timestamp
authorMichael Paquier <michael@paquier.xyz>
Sun, 12 Jul 2020 11:47:15 +0000 (20:47 +0900)
committerMichael Paquier <michael@paquier.xyz>
Sun, 12 Jul 2020 11:47:15 +0000 (20:47 +0900)
This includes two changes:
- Addition of a new function pg_xact_commit_timestamp_origin() able, for
a given transaction ID, to return the commit timestamp and replication
origin of this transaction.  An equivalent function existed in
pglogical.
- Addition of the replication origin to pg_last_committed_xact().

The commit timestamp manager includes already APIs able to return the
replication origin of a transaction on top of its commit timestamp, but
the code paths for replication origins were never stressed as those
functions have never looked for a replication origin, and the SQL
functions available have never included this information since their
introduction in 73c986a.

While on it, refactor a test of modules/commit_ts/ to use tstzrange() to
check that a transaction timestamp is within the wanted range, making
the test a bit easier to read.

Bump catalog version.

Author: Movead Li
Reviewed-by: Madan Kumar, Michael Paquier
Discussion: https://postgr.es/m/2020051116430836450630@highgo.ca

doc/src/sgml/func.sgml
src/backend/access/transam/commit_ts.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/test/modules/commit_ts/expected/commit_timestamp.out
src/test/modules/commit_ts/expected/commit_timestamp_1.out
src/test/modules/commit_ts/sql/commit_timestamp.sql

index f06585653503dae593747c8f687f3c25197653a0..cc83d6652e4f082b02ff4c8a95eee27cebde9cef 100644 (file)
@@ -23397,6 +23397,21 @@ SELECT collation for ('foo' COLLATE "de_DE");
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_xact_commit_timestamp_origin</primary>
+        </indexterm>
+        <function>pg_xact_commit_timestamp_origin</function> ( <type>xid</type> )
+        <returnvalue>record</returnvalue>
+        ( <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
+         <parameter>roident</parameter> <type>oid</type>)
+       </para>
+       <para>
+         Returns the commit timestamp and replication origin of a transaction.
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
@@ -23405,11 +23420,12 @@ SELECT collation for ('foo' COLLATE "de_DE");
         <function>pg_last_committed_xact</function> ()
         <returnvalue>record</returnvalue>
         ( <parameter>xid</parameter> <type>xid</type>,
-        <parameter>timestamp</parameter> <type>timestamp with time zone</type> )
+        <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
+        <parameter>roident</parameter> <type>oid</type> )
        </para>
        <para>
-        Returns the transaction ID and commit timestamp of the latest
-        committed transaction.
+        Returns the transaction ID, commit timestamp and replication origin
+        of the latest committed transaction.
        </para></entry>
       </row>
      </tbody>
index 182e5391f7b7769fde0acfdd80b086f7117bd73d..903280ae92d09b1320be15d73c002f3a46966e8b 100644 (file)
@@ -361,7 +361,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
  * is concerned, anyway; it's up to the caller to ensure the value is useful
  * for its purposes.)
  *
- * ts and extra are filled with the corresponding data; they can be passed
+ * ts and nodeid are filled with the corresponding data; they can be passed
  * as NULL if not wanted.
  */
 TransactionId
@@ -417,28 +417,38 @@ pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
 }
 
 
+/*
+ * pg_last_committed_xact
+ *
+ * SQL-callable wrapper to obtain some information about the latest
+ * committed transaction: transaction ID, timestamp and replication
+ * origin.
+ */
 Datum
 pg_last_committed_xact(PG_FUNCTION_ARGS)
 {
    TransactionId xid;
+   RepOriginId nodeid;
    TimestampTz ts;
-   Datum       values[2];
-   bool        nulls[2];
+   Datum       values[3];
+   bool        nulls[3];
    TupleDesc   tupdesc;
    HeapTuple   htup;
 
    /* and construct a tuple with our data */
-   xid = GetLatestCommitTsData(&ts, NULL);
+   xid = GetLatestCommitTsData(&ts, &nodeid);
 
    /*
     * Construct a tuple descriptor for the result row.  This must match this
     * function's pg_proc entry!
     */
-   tupdesc = CreateTemplateTupleDesc(2);
+   tupdesc = CreateTemplateTupleDesc(3);
    TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
                       XIDOID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
                       TIMESTAMPTZOID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
+                      OIDOID, -1, 0);
    tupdesc = BlessTupleDesc(tupdesc);
 
    if (!TransactionIdIsNormal(xid))
@@ -452,6 +462,9 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
 
        values[1] = TimestampTzGetDatum(ts);
        nulls[1] = false;
+
+       values[2] = ObjectIdGetDatum((Oid) nodeid);
+       nulls[2] = false;
    }
 
    htup = heap_form_tuple(tupdesc, values, nulls);
@@ -459,6 +472,54 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
    PG_RETURN_DATUM(HeapTupleGetDatum(htup));
 }
 
+/*
+ * pg_xact_commit_timestamp_origin
+ *
+ * SQL-callable wrapper to obtain commit timestamp and replication origin
+ * of a given transaction.
+ */
+Datum
+pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
+{
+   TransactionId xid = PG_GETARG_UINT32(0);
+   RepOriginId nodeid;
+   TimestampTz ts;
+   Datum       values[2];
+   bool        nulls[2];
+   TupleDesc   tupdesc;
+   HeapTuple   htup;
+   bool        found;
+
+   found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
+
+   /*
+    * Construct a tuple descriptor for the result row.  This must match this
+    * function's pg_proc entry!
+    */
+   tupdesc = CreateTemplateTupleDesc(2);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
+                      TIMESTAMPTZOID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
+                      OIDOID, -1, 0);
+   tupdesc = BlessTupleDesc(tupdesc);
+
+   if (!found)
+   {
+       memset(nulls, true, sizeof(nulls));
+   }
+   else
+   {
+       values[0] = TimestampTzGetDatum(ts);
+       nulls[0] = false;
+
+       values[1] = ObjectIdGetDatum((Oid) nodeid);
+       nulls[1] = false;
+   }
+
+   htup = heap_form_tuple(tupdesc, values, nulls);
+
+   PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
 
 /*
  * Number of shared CommitTS buffers.
index 60e5361af66e0fbd44afa16612dfcec0eecba28e..ee5858656964d628bf7e8a24bc207303b762c0f7 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202007072
+#define CATALOG_VERSION_NO 202007121
 
 #endif
index d951b4a36f240cbb623b85a480e1113e41757a9b..d81467af198f58a549bb498ecc8393f501fcac40 100644 (file)
   prorettype => 'timestamptz', proargtypes => 'xid',
   prosrc => 'pg_xact_commit_timestamp' },
 
+{ oid => '8456',
+  descr => 'get commit timestamp and replication origin of a transaction',
+  proname => 'pg_xact_commit_timestamp_origin', provolatile => 'v',
+  prorettype => 'record', proargtypes => 'xid',
+  proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{i,o,o}',
+  proargnames => '{xid,timestamp,roident}',
+  prosrc => 'pg_xact_commit_timestamp_origin' },
+
 { oid => '3583',
-  descr => 'get transaction Id and commit timestamp of latest transaction commit',
+  descr => 'get transaction Id, commit timestamp and replication origin of latest transaction commit',
   proname => 'pg_last_committed_xact', provolatile => 'v',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{xid,timestamptz}', proargmodes => '{o,o}',
-  proargnames => '{xid,timestamp}', prosrc => 'pg_last_committed_xact' },
+  proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}',
+  proargnames => '{xid,timestamp,roident}',
+  prosrc => 'pg_last_committed_xact' },
 
 { oid => '3537', descr => 'get identification of SQL object',
   proname => 'pg_describe_object', provolatile => 's', prorettype => 'text',
index 5b7783b58f3d8d4b1699e4236ec39daf82339e86..addd55bfd44acee86eb6e9b04e93242c9abf3cd0 100644 (file)
@@ -39,9 +39,94 @@ SELECT pg_xact_commit_timestamp('2'::xid);
  
 (1 row)
 
-SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
- ?column? | ?column? | ?column? 
-----------+----------+----------
- t        | t        | t
+SELECT x.xid::text::bigint > 0 as xid_valid,
+    x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+    roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
+ xid_valid | ts_in_range | valid_roident 
+-----------+-------------+---------------
+ t         | t           | f
+(1 row)
+
+-- Test non-normal transaction ids.
+SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
+ERROR:  cannot retrieve commit timestamp for transaction 0
+SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+-- Test transaction without replication origin
+SELECT txid_current() as txid_no_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
+ ts_in_range | valid_roident 
+-------------+---------------
+ t           | f
+(1 row)
+
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+ ts_in_range | valid_roident 
+-------------+---------------
+ t           | f
+(1 row)
+
+-- Test transaction with replication origin
+SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
+  AS valid_roident;
+ valid_roident 
+---------------
+ t
+(1 row)
+
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+ pg_replication_origin_session_setup 
+-------------------------------------
+(1 row)
+
+SELECT txid_current() as txid_with_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_last_committed_xact() x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+ ts_in_range |           roname           
+-------------+----------------------------
+ t           | test_commit_ts: get_origin
+(1 row)
+
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+ ts_in_range |           roname           
+-------------+----------------------------
+ t           | test_commit_ts: get_origin
+(1 row)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+(1 row)
+
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
+ pg_replication_origin_drop 
+----------------------------
 (1 row)
 
index c10b0abc2b75d3630515a17492f1bcbbeab50c41..02cd651ed9323ea4487bfbc7fa0683977e0cf7e1 100644 (file)
@@ -34,6 +34,79 @@ SELECT pg_xact_commit_timestamp('2'::xid);
  
 (1 row)
 
-SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
+SELECT x.xid::text::bigint > 0 as xid_valid,
+    x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+    roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
 ERROR:  could not get commit timestamp data
 HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+-- Test non-normal transaction ids.
+SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
+ERROR:  cannot retrieve commit timestamp for transaction 0
+SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
+ timestamp | roident 
+-----------+---------
+           |        
+(1 row)
+
+-- Test transaction without replication origin
+SELECT txid_current() as txid_no_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+-- Test transaction with replication origin
+SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
+  AS valid_roident;
+ valid_roident 
+---------------
+ t
+(1 row)
+
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+ pg_replication_origin_session_setup 
+-------------------------------------
+(1 row)
+
+SELECT txid_current() as txid_with_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_last_committed_xact() x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+ERROR:  could not get commit timestamp data
+HINT:  Make sure the configuration parameter "track_commit_timestamp" is set.
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+(1 row)
+
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
+ pg_replication_origin_drop 
+----------------------------
+(1 row)
+
index 4e041a534737135bd74837462edf054c6a35f28c..4a8a6aa56ef6b2d56af2d87505d100663598988b 100644 (file)
@@ -21,4 +21,37 @@ SELECT pg_xact_commit_timestamp('0'::xid);
 SELECT pg_xact_commit_timestamp('1'::xid);
 SELECT pg_xact_commit_timestamp('2'::xid);
 
-SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
+SELECT x.xid::text::bigint > 0 as xid_valid,
+    x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+    roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
+
+-- Test non-normal transaction ids.
+SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
+SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
+SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
+SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
+
+-- Test transaction without replication origin
+SELECT txid_current() as txid_no_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_last_committed_xact() x;
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
+       roident != 0 AS valid_roident
+  FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
+
+-- Test transaction with replication origin
+SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
+  AS valid_roident;
+SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
+SELECT txid_current() as txid_with_origin \gset
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_last_committed_xact() x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
+  FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
+  WHERE r.roident = x.roident;
+
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_drop('test_commit_ts: get_origin');