Add flush option to pg_logical_emit_message()
authorMichael Paquier <michael@paquier.xyz>
Wed, 18 Oct 2023 02:24:59 +0000 (11:24 +0900)
committerMichael Paquier <michael@paquier.xyz>
Wed, 18 Oct 2023 02:24:59 +0000 (11:24 +0900)
Since its introduction, LogLogicalMessage() (via the SQL interface
pg_logical_emit_message()) has never included a call to XLogFlush(),
causing it to potentially lose messages on a crash when used in
non-transactional mode.  This has come up to me as a problem while
playing with ideas to design a test suite for what has become
039_end_of_wal.pl introduced in bae868caf222 by Thomas Munro, because
there are no direct ways to force a WAL flush via SQL.

The default is false, to not flush messages and influence existing
use-cases where this function could be used.  If set to true, the
message emitted is flushed before returning back to the caller, making
the message durable on crash.  This new option has no effect when using
pg_logical_emit_message() in transactional mode, as the record's flush
is guaranteed by the WAL record generated by the transaction committed.

Two queries of test_decoding are tweaked to cover the new code path for
the flush.

Bump catalog version.

Author: Michael Paquier
Reviewed-by: Andres Freund, Amit Kapila, Fujii Masao, Tung Nguyen, Tomas
Vondra
Discussion: https://postgr.es/m/ZNsdThSe2qgsfs7R@paquier.xyz

contrib/test_decoding/expected/messages.out
contrib/test_decoding/sql/messages.sql
doc/src/sgml/func.sgml
src/backend/catalog/system_functions.sql
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/logical/message.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/replication/message.h

index 0fd70036bd5fc2cc6ba67fdfe3a019bbf5bdc77c..84baf8af3ee876a07f561a619720b2cbf01fdbf7 100644 (file)
@@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
  ?column? 
 ----------
  msg1
 (1 row)
 
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
  ?column? 
 ----------
  msg2
index 3d8500f99cb5bf7647e819db67195a54a1db2bfe..1f3dcb63ee72a2f5422507bf3a90a36ed559287c 100644 (file)
@@ -3,8 +3,9 @@ SET synchronous_commit = on;
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
 
 BEGIN;
 SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
index affd1254bb7892fae399e0dd3503dc7889d41c38..7c3e940afef83f0e4bc725726b85b28491945348 100644 (file)
@@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_logical_emit_message</primary>
         </indexterm>
-        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
+        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
         <returnvalue>pg_lsn</returnvalue>
        </para>
        <para role="func_signature">
-        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
+        <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
         <returnvalue>pg_lsn</returnvalue>
        </para>
        <para>
@@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         recognize messages that are interesting for them.
         The <parameter>content</parameter> parameter is the content of the
         message, given either in text or binary form.
+        The <parameter>flush</parameter> parameter (default set to
+        <literal>false</literal>) controls if the message is immediately
+        flushed to WAL or not. <parameter>flush</parameter> has no effect
+        with <parameter>transactional</parameter>, as the message's WAL
+        record is flushed along with its transaction.
        </para></entry>
       </row>
      </tbody>
index 07c0d89c4f8648aa2f9b7e93bd994b4986359682..35d738d5763e2abb25d940fbcf4a7ffcc639b8f5 100644 (file)
@@ -446,6 +446,26 @@ LANGUAGE INTERNAL
 VOLATILE ROWS 1000 COST 1000
 AS 'pg_logical_slot_peek_binary_changes';
 
+CREATE OR REPLACE FUNCTION pg_logical_emit_message(
+    transactional boolean,
+    prefix text,
+    message text,
+    flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_logical_emit_message_text';
+
+CREATE OR REPLACE FUNCTION pg_logical_emit_message(
+    transactional boolean,
+    prefix text,
+    message bytea,
+    flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_logical_emit_message_bytea';
+
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     IN temporary boolean DEFAULT false,
index 197169d6b0da1694d91380efab075ff0ed269051..1067aca08fc73b47cf907ef5caaa0def15280cb0 100644 (file)
@@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
    bool        transactional = PG_GETARG_BOOL(0);
    char       *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
    bytea      *data = PG_GETARG_BYTEA_PP(2);
+   bool        flush = PG_GETARG_BOOL(3);
    XLogRecPtr  lsn;
 
    lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
-                           transactional);
+                           transactional, flush);
    PG_RETURN_LSN(lsn);
 }
 
index c5de14afc6569c1db1599f7769fd3c37a9a1df9c..b5d29382f5428c776c87a17d6f35657d890b50e9 100644 (file)
  */
 XLogRecPtr
 LogLogicalMessage(const char *prefix, const char *message, size_t size,
-                 bool transactional)
+                 bool transactional, bool flush)
 {
    xl_logical_message xlrec;
+   XLogRecPtr  lsn;
 
    /*
     * Force xid to be allocated if we're emitting a transactional message.
@@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
    /* allow origin filtering */
    XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-   return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+   lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+
+   /*
+    * Make sure that the message hits disk before leaving if emitting a
+    * non-transactional message when flush is requested.
+    */
+   if (!transactional && flush)
+       XLogFlush(lsn);
+   return lsn;
 }
 
 /*
index c5f4af24dc1434a53582e45aa2d06028bbc50a73..2f46fdc7391d1bdbcf99758ee4549e1acb593b5d 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202310161
+#define CATALOG_VERSION_NO 202310181
 
 #endif
index 72ea4aa8b8c3ed4616bb758d1e7fa03f6fb0f221..c92d0631a01110673bc3e129e1fac1124e0d4976 100644 (file)
   prosrc => 'pg_replication_slot_advance' },
 { oid => '3577', descr => 'emit a textual logical decoding message',
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
-  prorettype => 'pg_lsn', proargtypes => 'bool text text',
+  prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
   prosrc => 'pg_logical_emit_message_text' },
 { oid => '3578', descr => 'emit a binary logical decoding message',
   proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
-  prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
+  prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
   prosrc => 'pg_logical_emit_message_bytea' },
 
 # event triggers
index 6ce7f2038b226a5a644068317bac8cbec6341826..0f168d572c1afec246878ae7d42c6f1a952cf8b5 100644 (file)
@@ -30,7 +30,8 @@ typedef struct xl_logical_message
 #define SizeOfLogicalMessage   (offsetof(xl_logical_message, message))
 
 extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
-                                   size_t size, bool transactional);
+                                   size_t size, bool transactional,
+                                   bool flush);
 
 /* RMGR API */
 #define XLOG_LOGICAL_MESSAGE   0x00