pgoutput: Fix memory leak due to RelationSyncEntry.map.
authorAmit Kapila <akapila@postgresql.org>
Tue, 1 Jun 2021 08:44:02 +0000 (14:14 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 1 Jun 2021 08:57:14 +0000 (14:27 +0530)
Release memory allocated when creating the tuple-conversion map and its
component TupleDescs when its owning sync entry is invalidated.
TupleDescs must also be freed when no map is deemed necessary, to begin
with.

Reported-by: Andres Freund
Author: Amit Langote
Reviewed-by: Takamichi Osumi, Amit Kapila
Backpatch-through: 13, where it was introduced
Discussion: https://postgr.es/m/MEYP282MB166933B1AB02B4FE56E82453B64D9@MEYP282MB1669.AUSP282.PROD.OUTLOOK.COM

src/backend/replication/pgoutput/pgoutput.c
src/test/subscription/t/013_partition.pl

index f68348dcf45975f305b2a7bcbf7c95f01732ee08..fe12d08a946daace8434a7ced030120d7e91eba1 100644 (file)
@@ -74,7 +74,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 /*
  * Entry in the map used to remember which relation schemas we sent.
  *
- * The schema_sent flag determines if the current schema record was already
+ * The schema_sent flag determines if the current schema record for the
+ * relation (and for its ancestor if publish_as_relid is set) was already
  * sent to the subscriber (in which case we don't need to send it again).
  *
  * The schema cache on downstream is however updated only at commit time,
@@ -92,10 +93,6 @@ typedef struct RelationSyncEntry
 {
        Oid                     relid;                  /* relation oid */
 
-       /*
-        * Did we send the schema?  If ancestor relid is set, its schema must also
-        * have been sent for this to be true.
-        */
        bool            schema_sent;
        List       *streamed_txns;      /* streamed toplevel transactions with this
                                                                 * schema */
@@ -437,10 +434,17 @@ maybe_send_schema(LogicalDecodingContext *ctx,
        else
                schema_sent = relentry->schema_sent;
 
+       /* Nothing to do if we already sent the schema. */
        if (schema_sent)
                return;
 
-       /* If needed, send the ancestor's schema first. */
+       /*
+        * Nope, so send the schema.  If the changes will be published using an
+        * ancestor's schema, not the relation's own, send that ancestor's schema
+        * before sending relation's own (XXX - maybe sending only the former
+        * suffices?).  This is also a good place to set the map that will be used
+        * to convert the relation's tuples into the ancestor's format, if needed.
+        */
        if (relentry->publish_as_relid != RelationGetRelid(relation))
        {
                Relation        ancestor = RelationIdGetRelation(relentry->publish_as_relid);
@@ -450,8 +454,21 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 
                /* Map must live as long as the session does. */
                oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-               relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
-                                                                                          CreateTupleDescCopy(outdesc));
+
+               /*
+                * Make copies of the TupleDescs that will live as long as the map
+                * does before putting into the map.
+                */
+               indesc = CreateTupleDescCopy(indesc);
+               outdesc = CreateTupleDescCopy(outdesc);
+               relentry->map = convert_tuples_by_name(indesc, outdesc);
+               if (relentry->map == NULL)
+               {
+                       /* Map not necessary, so free the TupleDescs too. */
+                       FreeTupleDesc(indesc);
+                       FreeTupleDesc(outdesc);
+               }
+
                MemoryContextSwitchTo(oldctx);
                send_relation_and_attrs(ancestor, xid, ctx);
                RelationClose(ancestor);
@@ -1011,6 +1028,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                entry->pubactions.pubinsert = entry->pubactions.pubupdate =
                        entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
                entry->publish_as_relid = InvalidOid;
+               entry->map = NULL;      /* will be set by maybe_send_schema() if needed */
        }
 
        /* Validate the entry */
@@ -1191,12 +1209,24 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 
        /*
         * Reset schema sent status as the relation definition may have changed.
+        * Also free any objects that depended on the earlier definition.
         */
        if (entry != NULL)
        {
                entry->schema_sent = false;
                list_free(entry->streamed_txns);
                entry->streamed_txns = NIL;
+               if (entry->map)
+               {
+                       /*
+                        * Must free the TupleDescs contained in the map explicitly,
+                        * because free_conversion_map() doesn't.
+                        */
+                       FreeTupleDesc(entry->map->indesc);
+                       FreeTupleDesc(entry->map->outdesc);
+                       free_conversion_map(entry->map);
+               }
+               entry->map = NULL;
        }
 }
 
index 6daf8daa3cc97ad0880dddf357364df349866a6e..9de01017be5dc1769289b6ac9cc71037a93a5fb0 100644 (file)
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 54;
+use Test::More tests => 56;
 
 # setup
 
@@ -624,3 +624,29 @@ is($result, qq(), 'truncate of tab3 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
 is($result, qq(), 'truncate of tab3_1 replicated');
+
+# check that the map to convert tuples from leaf partition to the root
+# table is correctly rebuilt when a new column is added
+$node_publisher->safe_psql('postgres',
+       "ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');