Some refactoring of logical/worker.c
authorPeter Eisentraut <peter@eisentraut.org>
Tue, 24 Mar 2020 13:00:58 +0000 (14:00 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Tue, 24 Mar 2020 14:00:54 +0000 (15:00 +0100)
This moves the main operations of apply_handle_{insert|update|delete},
that of inserting, updating, deleting a tuple into/from a given
relation, into corresponding
apply_handle_{insert|update|delete}_internal functions.  This allows
performing those operations on relations that are not directly the
targets of replication, which is something a later patch will use for
targeting partitioned tables.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

src/backend/replication/logical/worker.c

index ad4a732fd204440627e7610cd54a6bb00abf3204..e5f0aaa8f72594d3d05c1b5a00a8963fbb97727c 100644 (file)
@@ -113,6 +113,16 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void apply_handle_insert_internal(ResultRelInfo *relinfo,
+                                                                                EState *estate, TupleTableSlot *remoteslot);
+static void apply_handle_update_internal(ResultRelInfo *relinfo,
+                                                                                EState *estate, TupleTableSlot *remoteslot,
+                                                                                LogicalRepTupleData *newtup,
+                                                                                LogicalRepRelMapEntry *relmapentry);
+static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+                                                                                TupleTableSlot *remoteslot,
+                                                                                LogicalRepRelation *remoterel);
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -582,6 +592,7 @@ GetRelationIdentityOrPK(Relation rel)
 /*
  * Handle INSERT message.
  */
+
 static void
 apply_handle_insert(StringInfo s)
 {
@@ -621,13 +632,10 @@ apply_handle_insert(StringInfo s)
        slot_fill_defaults(rel, estate, remoteslot);
        MemoryContextSwitchTo(oldctx);
 
-       ExecOpenIndices(estate->es_result_relation_info, false);
+       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+       apply_handle_insert_internal(estate->es_result_relation_info, estate,
+                                                                remoteslot);
 
-       /* Do the insert. */
-       ExecSimpleRelationInsert(estate, remoteslot);
-
-       /* Cleanup. */
-       ExecCloseIndices(estate->es_result_relation_info);
        PopActiveSnapshot();
 
        /* Handle queued AFTER triggers. */
@@ -641,6 +649,20 @@ apply_handle_insert(StringInfo s)
        CommandCounterIncrement();
 }
 
+/* Workhorse for apply_handle_insert() */
+static void
+apply_handle_insert_internal(ResultRelInfo *relinfo,
+                                                        EState *estate, TupleTableSlot *remoteslot)
+{
+       ExecOpenIndices(relinfo, false);
+
+       /* Do the insert. */
+       ExecSimpleRelationInsert(estate, remoteslot);
+
+       /* Cleanup. */
+       ExecCloseIndices(relinfo);
+}
+
 /*
  * Check if the logical replication relation is updatable and throw
  * appropriate error if it isn't.
@@ -684,16 +706,12 @@ apply_handle_update(StringInfo s)
 {
        LogicalRepRelMapEntry *rel;
        LogicalRepRelId relid;
-       Oid                     idxoid;
        EState     *estate;
-       EPQState        epqstate;
        LogicalRepTupleData oldtup;
        LogicalRepTupleData newtup;
        bool            has_oldtup;
-       TupleTableSlot *localslot;
        TupleTableSlot *remoteslot;
        RangeTblEntry *target_rte;
-       bool            found;
        MemoryContext oldctx;
 
        ensure_transaction();
@@ -719,9 +737,6 @@ apply_handle_update(StringInfo s)
        remoteslot = ExecInitExtraTupleSlot(estate,
                                                                                RelationGetDescr(rel->localrel),
                                                                                &TTSOpsVirtual);
-       localslot = table_slot_create(rel->localrel,
-                                                                 &estate->es_tupleTable);
-       EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 
        /*
         * Populate updatedCols so that per-column triggers can fire.  This could
@@ -741,7 +756,6 @@ apply_handle_update(StringInfo s)
        fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
 
        PushActiveSnapshot(GetTransactionSnapshot());
-       ExecOpenIndices(estate->es_result_relation_info, false);
 
        /* Build the search tuple. */
        oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -749,20 +763,57 @@ apply_handle_update(StringInfo s)
                                                has_oldtup ? oldtup.values : newtup.values);
        MemoryContextSwitchTo(oldctx);
 
+       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+       apply_handle_update_internal(estate->es_result_relation_info, estate,
+                                                                remoteslot, &newtup, rel);
+
+       PopActiveSnapshot();
+
+       /* Handle queued AFTER triggers. */
+       AfterTriggerEndQuery(estate);
+
+       ExecResetTupleTable(estate->es_tupleTable, false);
+       FreeExecutorState(estate);
+
+       logicalrep_rel_close(rel, NoLock);
+
+       CommandCounterIncrement();
+}
+
+/* Workhorse for apply_handle_update() */
+static void
+apply_handle_update_internal(ResultRelInfo *relinfo,
+                                                        EState *estate, TupleTableSlot *remoteslot,
+                                                        LogicalRepTupleData *newtup,
+                                                        LogicalRepRelMapEntry *relmapentry)
+{
+       Relation        localrel = relinfo->ri_RelationDesc;
+       LogicalRepRelation *remoterel = &relmapentry->remoterel;
+       Oid                     idxoid;
+       EPQState        epqstate;
+       TupleTableSlot *localslot;
+       bool            found;
+       MemoryContext oldctx;
+
+       localslot = table_slot_create(localrel, &estate->es_tupleTable);
+       EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+       ExecOpenIndices(relinfo, false);
+
        /*
         * Try to find tuple using either replica identity index, primary key or
         * if needed, sequential scan.
         */
-       idxoid = GetRelationIdentityOrPK(rel->localrel);
+       idxoid = GetRelationIdentityOrPK(localrel);
        Assert(OidIsValid(idxoid) ||
-                  (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
+                  (remoterel->replident == REPLICA_IDENTITY_FULL));
 
        if (OidIsValid(idxoid))
-               found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+               found = RelationFindReplTupleByIndex(localrel, idxoid,
                                                                                         LockTupleExclusive,
                                                                                         remoteslot, localslot);
        else
-               found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
                                                                                 remoteslot, localslot);
 
        ExecClearTuple(remoteslot);
@@ -776,8 +827,8 @@ apply_handle_update(StringInfo s)
        {
                /* Process and store remote tuple in the slot */
                oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-               slot_modify_cstrings(remoteslot, localslot, rel,
-                                                        newtup.values, newtup.changed);
+               slot_modify_cstrings(remoteslot, localslot, relmapentry,
+                                                        newtup->values, newtup->changed);
                MemoryContextSwitchTo(oldctx);
 
                EvalPlanQualSetSlot(&epqstate, remoteslot);
@@ -795,23 +846,12 @@ apply_handle_update(StringInfo s)
                elog(DEBUG1,
                         "logical replication did not find row for update "
                         "in replication target relation \"%s\"",
-                        RelationGetRelationName(rel->localrel));
+                        RelationGetRelationName(localrel));
        }
 
        /* Cleanup. */
-       ExecCloseIndices(estate->es_result_relation_info);
-       PopActiveSnapshot();
-
-       /* Handle queued AFTER triggers. */
-       AfterTriggerEndQuery(estate);
-
+       ExecCloseIndices(relinfo);
        EvalPlanQualEnd(&epqstate);
-       ExecResetTupleTable(estate->es_tupleTable, false);
-       FreeExecutorState(estate);
-
-       logicalrep_rel_close(rel, NoLock);
-
-       CommandCounterIncrement();
 }
 
 /*
@@ -825,12 +865,8 @@ apply_handle_delete(StringInfo s)
        LogicalRepRelMapEntry *rel;
        LogicalRepTupleData oldtup;
        LogicalRepRelId relid;
-       Oid                     idxoid;
        EState     *estate;
-       EPQState        epqstate;
        TupleTableSlot *remoteslot;
-       TupleTableSlot *localslot;
-       bool            found;
        MemoryContext oldctx;
 
        ensure_transaction();
@@ -855,33 +891,64 @@ apply_handle_delete(StringInfo s)
        remoteslot = ExecInitExtraTupleSlot(estate,
                                                                                RelationGetDescr(rel->localrel),
                                                                                &TTSOpsVirtual);
-       localslot = table_slot_create(rel->localrel,
-                                                                 &estate->es_tupleTable);
-       EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 
        PushActiveSnapshot(GetTransactionSnapshot());
-       ExecOpenIndices(estate->es_result_relation_info, false);
 
-       /* Find the tuple using the replica identity index. */
+       /* Build the search tuple. */
        oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
        slot_store_cstrings(remoteslot, rel, oldtup.values);
        MemoryContextSwitchTo(oldctx);
 
+       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+       apply_handle_delete_internal(estate->es_result_relation_info, estate,
+                                                                remoteslot, &rel->remoterel);
+
+       PopActiveSnapshot();
+
+       /* Handle queued AFTER triggers. */
+       AfterTriggerEndQuery(estate);
+
+       ExecResetTupleTable(estate->es_tupleTable, false);
+       FreeExecutorState(estate);
+
+       logicalrep_rel_close(rel, NoLock);
+
+       CommandCounterIncrement();
+}
+
+/* Workhorse for apply_handle_delete() */
+static void
+apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+                                                        TupleTableSlot *remoteslot,
+                                                        LogicalRepRelation *remoterel)
+{
+       Relation        localrel = relinfo->ri_RelationDesc;
+       Oid                     idxoid;
+       EPQState        epqstate;
+       TupleTableSlot *localslot;
+       bool            found;
+
+       localslot = table_slot_create(localrel, &estate->es_tupleTable);
+       EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+       ExecOpenIndices(relinfo, false);
+
        /*
         * Try to find tuple using either replica identity index, primary key or
         * if needed, sequential scan.
         */
-       idxoid = GetRelationIdentityOrPK(rel->localrel);
+       idxoid = GetRelationIdentityOrPK(localrel);
        Assert(OidIsValid(idxoid) ||
-                  (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
+                  (remoterel->replident == REPLICA_IDENTITY_FULL));
 
        if (OidIsValid(idxoid))
-               found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+               found = RelationFindReplTupleByIndex(localrel, idxoid,
                                                                                         LockTupleExclusive,
                                                                                         remoteslot, localslot);
        else
-               found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
                                                                                 remoteslot, localslot);
+
        /* If found delete it. */
        if (found)
        {
@@ -896,23 +963,12 @@ apply_handle_delete(StringInfo s)
                elog(DEBUG1,
                         "logical replication could not find row for delete "
                         "in replication target relation \"%s\"",
-                        RelationGetRelationName(rel->localrel));
+                        RelationGetRelationName(localrel));
        }
 
        /* Cleanup. */
-       ExecCloseIndices(estate->es_result_relation_info);
-       PopActiveSnapshot();
-
-       /* Handle queued AFTER triggers. */
-       AfterTriggerEndQuery(estate);
-
+       ExecCloseIndices(relinfo);
        EvalPlanQualEnd(&epqstate);
-       ExecResetTupleTable(estate->es_tupleTable, false);
-       FreeExecutorState(estate);
-
-       logicalrep_rel_close(rel, NoLock);
-
-       CommandCounterIncrement();
 }
 
 /*