Fix the misuse of origin filter across multiple pg_logical_slot_get_changes() calls.
authorAmit Kapila <akapila@postgresql.org>
Wed, 27 Sep 2023 09:02:51 +0000 (14:32 +0530)
committerAmit Kapila <akapila@postgresql.org>
Wed, 27 Sep 2023 09:02:51 +0000 (14:32 +0530)
The pgoutput module uses a global variable (publish_no_origin) to cache
the action for the origin filter, but we didn't reset the flag when
shutting down the output plugin, so subsequent retries may access the
previous publish_no_origin value.

We fix this by storing the flag in the output plugin's private data.
Additionally, the patch removes the currently unused origin string from the
structure.

For the back branch, to avoid changing the exposed structure, we eliminated the
global variable and instead directly used the origin string for change
filtering.

Author: Hou Zhijie
Reviewed-by: Amit Kapila, Michael Paquier
Backpatch-through: 16
Discussion: http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com

contrib/test_decoding/expected/replorigin.out
contrib/test_decoding/sql/replorigin.sql
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/pgoutput.h

index 49ffaeea2da8bafaceeab811b7ca658779b4ea03..c85e1a01b231c64c0c948f8ad3c1ab59a8178add 100644 (file)
@@ -267,3 +267,59 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn
  
 (1 row)
 
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+ pg_replication_origin_create 
+------------------------------
+                            1
+(1 row)
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+ pg_replication_origin_session_setup 
+-------------------------------------
+(1 row)
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+ ?column? 
+----------
+ t
+(1 row)
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+ ?column? 
+----------
+ t
+(1 row)
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+(1 row)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+ pg_replication_origin_drop 
+----------------------------
+(1 row)
+
+DROP PUBLICATION pub;
index db06541f56559d9ee2cccd976fd5fe29e05a550b..e71ee02d050a0d52ead24de4796c6a8a4fe29ca7 100644 (file)
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
 SELECT pg_replication_origin_session_reset();
 SELECT pg_drop_replication_slot('regression_slot_no_lsn');
 SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
+
+-- Test that the pgoutput correctly filters changes corresponding to the provided origin value.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
+CREATE PUBLICATION pub FOR TABLE target_tbl;
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+INSERT INTO target_tbl(data) VALUES ('test data');
+
+-- The replayed change will be filtered.
+SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origin', 'none');
+
+-- The replayed change will be output if the origin value is not specified.
+SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
+
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+DROP PUBLICATION pub;
index 3d2becb45cfbf0501226cbdcb892a82a44013e19..251ba46da5e2ceb5db00865a7ec2f83b8e1879f7 100644 (file)
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 
 static bool publications_valid;
 static bool in_streaming;
-static bool publish_no_origin;
 
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
                }
                else if (strcmp(defel->defname, "origin") == 0)
                {
+                       char       *origin;
+
                        if (origin_option_given)
                                ereport(ERROR,
                                                errcode(ERRCODE_SYNTAX_ERROR),
                                                errmsg("conflicting or redundant options"));
                        origin_option_given = true;
 
-                       data->origin = defGetString(defel);
-                       if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
-                               publish_no_origin = true;
-                       else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
-                               publish_no_origin = false;
+                       origin = defGetString(defel);
+                       if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
+                               data->publish_no_origin = true;
+                       else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
+                               data->publish_no_origin = false;
                        else
                                ereport(ERROR,
                                                errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                               errmsg("unrecognized origin value: \"%s\"", data->origin));
+                                               errmsg("unrecognized origin value: \"%s\"", origin));
                }
                else
                        elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -1673,7 +1674,9 @@ static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                           RepOriginId origin_id)
 {
-       if (publish_no_origin && origin_id != InvalidRepOriginId)
+       PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+       if (data->publish_no_origin && origin_id != InvalidRepOriginId)
                return true;
 
        return false;
index b4a8015403b5022a1879bde8228e944dbd1b9c72..b3f9a016293e2b51b1150a9f7f5e6d910eb0227d 100644 (file)
@@ -29,7 +29,7 @@ typedef struct PGOutputData
        char            streaming;
        bool            messages;
        bool            two_phase;
-       char       *origin;
+       bool            publish_no_origin;
 } PGOutputData;
 
 #endif                                                 /* PGOUTPUT_H */