Refactor code to look up local replication tuple
authorPeter Eisentraut <peter@eisentraut.org>
Wed, 1 Apr 2020 13:31:47 +0000 (15:31 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Wed, 1 Apr 2020 13:34:41 +0000 (15:34 +0200)
This unifies some duplicate code.

Author: Amit Langote <amitlangote09@gmail.com>
Discussion: https://www.postgresql.org/message-id/CA+HiwqFjYE5anArxvkjr37AQMd52L-LZtz9Ld2QrLQ3YfcYhTw@mail.gmail.com

src/backend/replication/logical/worker.c

index fa3811783f6e9ecd7719a8de8c3813b3bb243315..673ebd211d1832c00a72423f9666acc850a651b7 100644 (file)
@@ -122,6 +122,10 @@ static void apply_handle_update_internal(ResultRelInfo *relinfo,
 static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
                                                                                 TupleTableSlot *remoteslot,
                                                                                 LogicalRepRelation *remoterel);
+static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
+                                                                       LogicalRepRelation *remoterel,
+                                                                       TupleTableSlot *remoteslot,
+                                                                       TupleTableSlot **localslot);
 
 /*
  * Should this worker apply changes for given relation.
@@ -788,33 +792,17 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
                                                         LogicalRepRelMapEntry *relmapentry)
 {
        Relation        localrel = relinfo->ri_RelationDesc;
-       Oid                     idxoid;
        EPQState        epqstate;
        TupleTableSlot *localslot;
        bool            found;
        MemoryContext oldctx;
 
-       localslot = table_slot_create(localrel, &estate->es_tupleTable);
        EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-
        ExecOpenIndices(relinfo, false);
 
-       /*
-        * Try to find tuple using either replica identity index, primary key or
-        * if needed, sequential scan.
-        */
-       idxoid = GetRelationIdentityOrPK(localrel);
-       Assert(OidIsValid(idxoid) ||
-                  (relmapentry->remoterel.replident == REPLICA_IDENTITY_FULL));
-
-       if (OidIsValid(idxoid))
-               found = RelationFindReplTupleByIndex(localrel, idxoid,
-                                                                                        LockTupleExclusive,
-                                                                                        remoteslot, localslot);
-       else
-               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
-                                                                                remoteslot, localslot);
-
+       found = FindReplTupleInLocalRel(estate, localrel,
+                                                                       &relmapentry->remoterel,
+                                                                       remoteslot, &localslot);
        ExecClearTuple(remoteslot);
 
        /*
@@ -922,31 +910,15 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
                                                         LogicalRepRelation *remoterel)
 {
        Relation        localrel = relinfo->ri_RelationDesc;
-       Oid                     idxoid;
        EPQState        epqstate;
        TupleTableSlot *localslot;
        bool            found;
 
-       localslot = table_slot_create(localrel, &estate->es_tupleTable);
        EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-
        ExecOpenIndices(relinfo, false);
 
-       /*
-        * Try to find tuple using either replica identity index, primary key or
-        * if needed, sequential scan.
-        */
-       idxoid = GetRelationIdentityOrPK(localrel);
-       Assert(OidIsValid(idxoid) ||
-                  (remoterel->replident == REPLICA_IDENTITY_FULL));
-
-       if (OidIsValid(idxoid))
-               found = RelationFindReplTupleByIndex(localrel, idxoid,
-                                                                                        LockTupleExclusive,
-                                                                                        remoteslot, localslot);
-       else
-               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
-                                                                                remoteslot, localslot);
+       found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+                                                                       remoteslot, &localslot);
 
        /* If found delete it. */
        if (found)
@@ -970,6 +942,39 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
        EvalPlanQualEnd(&epqstate);
 }
 
+/*
+ * Try to find a tuple received from the publication side (in 'remoteslot') in
+ * the corresponding local relation using either replica identity index,
+ * primary key or if needed, sequential scan.
+ *
+ * Local tuple, if found, is returned in '*localslot'.
+ */
+static bool
+FindReplTupleInLocalRel(EState *estate, Relation localrel,
+                                               LogicalRepRelation *remoterel,
+                                               TupleTableSlot *remoteslot,
+                                               TupleTableSlot **localslot)
+{
+       Oid                     idxoid;
+       bool            found;
+
+       *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+
+       idxoid = GetRelationIdentityOrPK(localrel);
+       Assert(OidIsValid(idxoid) ||
+                  (remoterel->replident == REPLICA_IDENTITY_FULL));
+
+       if (OidIsValid(idxoid))
+               found = RelationFindReplTupleByIndex(localrel, idxoid,
+                                                                                        LockTupleExclusive,
+                                                                                        remoteslot, *localslot);
+       else
+               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+                                                                                remoteslot, *localslot);
+
+       return found;
+}
+
 /*
  * Handle TRUNCATE message.
  *