Refactor pgoutput_change().
authorAmit Kapila <akapila@postgresql.org>
Thu, 30 Mar 2023 05:40:38 +0000 (11:10 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 30 Mar 2023 05:40:38 +0000 (11:10 +0530)
Instead of mostly-duplicate code for different operation
(insert/update/delete) types, write a common code to compute old/new
tuples, and check the row filter.

Author: Hou Zhijie
Reviewed-by: Peter Smith, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB5716194A47FFA8D91133687D94BF9@OS0PR01MB5716.jpnprd01.prod.outlook.com

src/backend/replication/pgoutput/pgoutput.c

index 3a2d2e357e031fed665ca012dd6f720133b2804d..ebaf555d56928c2779a3a886002068166a60b766 100644 (file)
@@ -1440,6 +1440,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                case REORDER_BUFFER_CHANGE_DELETE:
                        if (!relentry->pubactions.pubdelete)
                                return;
+
+                       /*
+                        * This is only possible if deletes are allowed even when replica
+                        * identity is not defined for a table. Since the DELETE action
+                        * can't be published, we simply return.
+                        */
+                       if (!change->data.tp.oldtuple)
+                       {
+                               elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+                               return;
+                       }
                        break;
                default:
                        Assert(false);
@@ -1448,187 +1459,99 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        /* Avoid leaking memory by using and resetting our own context */
        old = MemoryContextSwitchTo(data->context);
 
-       /* Send the data */
-       switch (action)
+       /* Switch relation if publishing via root. */
+       if (relentry->publish_as_relid != RelationGetRelid(relation))
        {
-               case REORDER_BUFFER_CHANGE_INSERT:
-                       new_slot = relentry->new_slot;
-                       ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-                                                          new_slot, false);
-
-                       /* Switch relation if publishing via root. */
-                       if (relentry->publish_as_relid != RelationGetRelid(relation))
-                       {
-                               Assert(relation->rd_rel->relispartition);
-                               ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-                               targetrel = ancestor;
-                               /* Convert tuple if needed. */
-                               if (relentry->attrmap)
-                               {
-                                       TupleDesc       tupdesc = RelationGetDescr(targetrel);
-
-                                       new_slot = execute_attr_map_slot(relentry->attrmap,
-                                                                                                        new_slot,
-                                                                                                        MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-                               }
-                       }
-
-                       /* Check row filter */
-                       if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-                                                                        &action))
-                               break;
-
-                       /*
-                        * Send BEGIN if we haven't yet.
-                        *
-                        * We send the BEGIN message after ensuring that we will actually
-                        * send the change. This avoids sending a pair of BEGIN/COMMIT
-                        * messages for empty transactions.
-                        */
-                       if (txndata && !txndata->sent_begin_txn)
-                               pgoutput_send_begin(ctx, txn);
-
-                       /*
-                        * Schema should be sent using the original relation because it
-                        * also sends the ancestor's relation.
-                        */
-                       maybe_send_schema(ctx, change, relation, relentry);
+               Assert(relation->rd_rel->relispartition);
+               ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+               targetrel = ancestor;
+       }
 
-                       OutputPluginPrepareWrite(ctx, true);
-                       logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-                                                                       data->binary, relentry->columns);
-                       OutputPluginWrite(ctx, true);
-                       break;
-               case REORDER_BUFFER_CHANGE_UPDATE:
-                       if (change->data.tp.oldtuple)
-                       {
-                               old_slot = relentry->old_slot;
-                               ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-                                                                  old_slot, false);
-                       }
+       if (change->data.tp.oldtuple)
+       {
+               old_slot = relentry->old_slot;
+               ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-                       new_slot = relentry->new_slot;
-                       ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-                                                          new_slot, false);
+               /* Convert tuple if needed. */
+               if (relentry->attrmap)
+               {
+                       TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+                                                                                                         &TTSOpsVirtual);
 
-                       /* Switch relation if publishing via root. */
-                       if (relentry->publish_as_relid != RelationGetRelid(relation))
-                       {
-                               Assert(relation->rd_rel->relispartition);
-                               ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-                               targetrel = ancestor;
-                               /* Convert tuples if needed. */
-                               if (relentry->attrmap)
-                               {
-                                       TupleDesc       tupdesc = RelationGetDescr(targetrel);
+                       old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+               }
+       }
 
-                                       if (old_slot)
-                                               old_slot = execute_attr_map_slot(relentry->attrmap,
-                                                                                                                old_slot,
-                                                                                                                MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
+       if (change->data.tp.newtuple)
+       {
+               new_slot = relentry->new_slot;
+               ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-                                       new_slot = execute_attr_map_slot(relentry->attrmap,
-                                                                                                        new_slot,
-                                                                                                        MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-                               }
-                       }
+               /* Convert tuple if needed. */
+               if (relentry->attrmap)
+               {
+                       TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+                                                                                                         &TTSOpsVirtual);
 
-                       /* Check row filter */
-                       if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-                                                                        relentry, &action))
-                               break;
+                       new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+               }
+       }
 
-                       /* Send BEGIN if we haven't yet */
-                       if (txndata && !txndata->sent_begin_txn)
-                               pgoutput_send_begin(ctx, txn);
+       /*
+        * Check row filter.
+        *
+        * Updates could be transformed to inserts or deletes based on the results
+        * of the row filter for old and new tuple.
+        */
+       if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+               goto cleanup;
 
-                       maybe_send_schema(ctx, change, relation, relentry);
+       /*
+        * Send BEGIN if we haven't yet.
+        *
+        * We send the BEGIN message after ensuring that we will actually send the
+        * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+        * transactions.
+        */
+       if (txndata && !txndata->sent_begin_txn)
+               pgoutput_send_begin(ctx, txn);
 
-                       OutputPluginPrepareWrite(ctx, true);
+       /*
+        * Schema should be sent using the original relation because it also sends
+        * the ancestor's relation.
+        */
+       maybe_send_schema(ctx, change, relation, relentry);
 
-                       /*
-                        * Updates could be transformed to inserts or deletes based on the
-                        * results of the row filter for old and new tuple.
-                        */
-                       switch (action)
-                       {
-                               case REORDER_BUFFER_CHANGE_INSERT:
-                                       logicalrep_write_insert(ctx->out, xid, targetrel,
-                                                                                       new_slot, data->binary,
-                                                                                       relentry->columns);
-                                       break;
-                               case REORDER_BUFFER_CHANGE_UPDATE:
-                                       logicalrep_write_update(ctx->out, xid, targetrel,
-                                                                                       old_slot, new_slot, data->binary,
-                                                                                       relentry->columns);
-                                       break;
-                               case REORDER_BUFFER_CHANGE_DELETE:
-                                       logicalrep_write_delete(ctx->out, xid, targetrel,
-                                                                                       old_slot, data->binary,
-                                                                                       relentry->columns);
-                                       break;
-                               default:
-                                       Assert(false);
-                       }
+       OutputPluginPrepareWrite(ctx, true);
 
-                       OutputPluginWrite(ctx, true);
+       /* Send the data */
+       switch (action)
+       {
+               case REORDER_BUFFER_CHANGE_INSERT:
+                       logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+                                                                       data->binary, relentry->columns);
+                       break;
+               case REORDER_BUFFER_CHANGE_UPDATE:
+                       logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+                                                                       new_slot, data->binary, relentry->columns);
                        break;
                case REORDER_BUFFER_CHANGE_DELETE:
-                       if (change->data.tp.oldtuple)
-                       {
-                               old_slot = relentry->old_slot;
-
-                               ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-                                                                  old_slot, false);
-
-                               /* Switch relation if publishing via root. */
-                               if (relentry->publish_as_relid != RelationGetRelid(relation))
-                               {
-                                       Assert(relation->rd_rel->relispartition);
-                                       ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-                                       targetrel = ancestor;
-                                       /* Convert tuple if needed. */
-                                       if (relentry->attrmap)
-                                       {
-                                               TupleDesc       tupdesc = RelationGetDescr(targetrel);
-
-                                               old_slot = execute_attr_map_slot(relentry->attrmap,
-                                                                                                                old_slot,
-                                                                                                                MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-                                       }
-                               }
-
-                               /* Check row filter */
-                               if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-                                                                                relentry, &action))
-                                       break;
-
-                               /* Send BEGIN if we haven't yet */
-                               if (txndata && !txndata->sent_begin_txn)
-                                       pgoutput_send_begin(ctx, txn);
-
-                               maybe_send_schema(ctx, change, relation, relentry);
-
-                               OutputPluginPrepareWrite(ctx, true);
-                               logicalrep_write_delete(ctx->out, xid, targetrel,
-                                                                               old_slot, data->binary,
-                                                                               relentry->columns);
-                               OutputPluginWrite(ctx, true);
-                       }
-                       else
-                               elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+                       logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+                                                                       data->binary, relentry->columns);
                        break;
                default:
                        Assert(false);
        }
 
+       OutputPluginWrite(ctx, true);
+
+cleanup:
        if (RelationIsValid(ancestor))
        {
                RelationClose(ancestor);
                ancestor = NULL;
        }
 
-       /* Cleanup */
        MemoryContextSwitchTo(old);
        MemoryContextReset(data->context);
 }