static void maybe_reread_subscription(void);
+static void apply_handle_insert_internal(ResultRelInfo *relinfo,
+ EState *estate, TupleTableSlot *remoteslot);
+static void apply_handle_update_internal(ResultRelInfo *relinfo,
+ EState *estate, TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry);
+static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepRelation *remoterel);
+
/*
* Should this worker apply changes for given relation.
*
/*
* Handle INSERT message.
*/
+
static void
apply_handle_insert(StringInfo s)
{
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
- ExecOpenIndices(estate->es_result_relation_info, false);
+ Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+ apply_handle_insert_internal(estate->es_result_relation_info, estate,
+ remoteslot);
- /* Do the insert. */
- ExecSimpleRelationInsert(estate, remoteslot);
-
- /* Cleanup. */
- ExecCloseIndices(estate->es_result_relation_info);
PopActiveSnapshot();
/* Handle queued AFTER triggers. */
CommandCounterIncrement();
}
+/* Workhorse for apply_handle_insert() */
+static void
+apply_handle_insert_internal(ResultRelInfo *relinfo,
+ EState *estate, TupleTableSlot *remoteslot)
+{
+ ExecOpenIndices(relinfo, false);
+
+ /* Do the insert. */
+ ExecSimpleRelationInsert(estate, remoteslot);
+
+ /* Cleanup. */
+ ExecCloseIndices(relinfo);
+}
+
/*
* Check if the logical replication relation is updatable and throw
* appropriate error if it isn't.
{
LogicalRepRelMapEntry *rel;
LogicalRepRelId relid;
- Oid idxoid;
EState *estate;
- EPQState epqstate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
bool has_oldtup;
- TupleTableSlot *localslot;
TupleTableSlot *remoteslot;
RangeTblEntry *target_rte;
- bool found;
MemoryContext oldctx;
ensure_transaction();
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
- localslot = table_slot_create(rel->localrel,
- &estate->es_tupleTable);
- EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
/*
* Populate updatedCols so that per-column triggers can fire. This could
fill_extraUpdatedCols(target_rte, RelationGetDescr(rel->localrel));
PushActiveSnapshot(GetTransactionSnapshot());
- ExecOpenIndices(estate->es_result_relation_info, false);
/* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
has_oldtup ? oldtup.values : newtup.values);
MemoryContextSwitchTo(oldctx);
+ Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+ apply_handle_update_internal(estate->es_result_relation_info, estate,
+ remoteslot, &newtup, rel);
+
+ PopActiveSnapshot();
+
+ /* Handle queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ CommandCounterIncrement();
+}
+
+/* Workhorse for apply_handle_update() */
+static void
+apply_handle_update_internal(ResultRelInfo *relinfo,
+ EState *estate, TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ LogicalRepRelMapEntry *relmapentry)
+{
+ Relation localrel = relinfo->ri_RelationDesc;
+ LogicalRepRelation *remoterel = &relmapentry->remoterel;
+ Oid idxoid;
+ EPQState epqstate;
+ TupleTableSlot *localslot;
+ bool found;
+ MemoryContext oldctx;
+
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+ ExecOpenIndices(relinfo, false);
+
/*
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
- idxoid = GetRelationIdentityOrPK(rel->localrel);
+ idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
- (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
+ (remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+ found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive,
remoteslot, localslot);
else
- found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot);
ExecClearTuple(remoteslot);
{
/* Process and store remote tuple in the slot */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- slot_modify_cstrings(remoteslot, localslot, rel,
- newtup.values, newtup.changed);
+ slot_modify_cstrings(remoteslot, localslot, relmapentry,
+ newtup->values, newtup->changed);
MemoryContextSwitchTo(oldctx);
EvalPlanQualSetSlot(&epqstate, remoteslot);
elog(DEBUG1,
"logical replication did not find row for update "
"in replication target relation \"%s\"",
- RelationGetRelationName(rel->localrel));
+ RelationGetRelationName(localrel));
}
/* Cleanup. */
- ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
-
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
+ ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
-
- logicalrep_rel_close(rel, NoLock);
-
- CommandCounterIncrement();
}
/*
LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
- Oid idxoid;
EState *estate;
- EPQState epqstate;
TupleTableSlot *remoteslot;
- TupleTableSlot *localslot;
- bool found;
MemoryContext oldctx;
ensure_transaction();
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
- localslot = table_slot_create(rel->localrel,
- &estate->es_tupleTable);
- EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
PushActiveSnapshot(GetTransactionSnapshot());
- ExecOpenIndices(estate->es_result_relation_info, false);
- /* Find the tuple using the replica identity index. */
+ /* Build the search tuple. */
oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
slot_store_cstrings(remoteslot, rel, oldtup.values);
MemoryContextSwitchTo(oldctx);
+ Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
+ apply_handle_delete_internal(estate->es_result_relation_info, estate,
+ remoteslot, &rel->remoterel);
+
+ PopActiveSnapshot();
+
+ /* Handle queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ CommandCounterIncrement();
+}
+
+/* Workhorse for apply_handle_delete() */
+static void
+apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+ TupleTableSlot *remoteslot,
+ LogicalRepRelation *remoterel)
+{
+ Relation localrel = relinfo->ri_RelationDesc;
+ Oid idxoid;
+ EPQState epqstate;
+ TupleTableSlot *localslot;
+ bool found;
+
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+ ExecOpenIndices(relinfo, false);
+
/*
* Try to find tuple using either replica identity index, primary key or
* if needed, sequential scan.
*/
- idxoid = GetRelationIdentityOrPK(rel->localrel);
+ idxoid = GetRelationIdentityOrPK(localrel);
Assert(OidIsValid(idxoid) ||
- (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
+ (remoterel->replident == REPLICA_IDENTITY_FULL));
if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+ found = RelationFindReplTupleByIndex(localrel, idxoid,
LockTupleExclusive,
remoteslot, localslot);
else
- found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
remoteslot, localslot);
+
/* If found delete it. */
if (found)
{
elog(DEBUG1,
"logical replication could not find row for delete "
"in replication target relation \"%s\"",
- RelationGetRelationName(rel->localrel));
+ RelationGetRelationName(localrel));
}
/* Cleanup. */
- ExecCloseIndices(estate->es_result_relation_info);
- PopActiveSnapshot();
-
- /* Handle queued AFTER triggers. */
- AfterTriggerEndQuery(estate);
-
+ ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
- ExecResetTupleTable(estate->es_tupleTable, false);
- FreeExecutorState(estate);
-
- logicalrep_rel_close(rel, NoLock);
-
- CommandCounterIncrement();
}
/*