int remote_attnum;
} SlotErrCallbackArg;
+typedef struct ApplyExecutionData
+{
+ EState *estate; /* executor state, used to track resources */
+
+ LogicalRepRelMapEntry *targetRel; /* replication target rel */
+ ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
+
+ /* These fields are used when the target relation is partitioned: */
+ ModifyTableState *mtstate; /* dummy ModifyTable state */
+ PartitionTupleRouting *proute; /* partition routing info */
+} ApplyExecutionData;
+
/*
* Stream xid hash entry. Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle.
static void apply_handle_commit_internal(StringInfo s,
LogicalRepCommitData *commit_data);
-static void apply_handle_insert_internal(ResultRelInfo *relinfo,
- EState *estate, TupleTableSlot *remoteslot);
-static void apply_handle_update_internal(ResultRelInfo *relinfo,
- EState *estate, TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry);
-static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
+static void apply_handle_insert_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot);
+static void apply_handle_update_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- LogicalRepRelation *remoterel);
+ LogicalRepTupleData *newtup);
+static void apply_handle_delete_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
-static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
- EState *estate,
+static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry,
CmdType operation);
/*
/*
* Executor state preparation for evaluation of constraint expressions,
- * indexes and triggers.
+ * indexes and triggers for the specified relation.
*
- * resultRelInfo is a ResultRelInfo for the relation to be passed to the
- * executor routines. The caller must open and close any indexes to be
- * updated independently of the relation registered here.
+ * Note that the caller must open and close any indexes to be updated.
*/
-static EState *
-create_estate_for_relation(LogicalRepRelMapEntry *rel,
- ResultRelInfo **resultRelInfo)
+static ApplyExecutionData *
+create_edata_for_relation(LogicalRepRelMapEntry *rel)
{
+ ApplyExecutionData *edata;
EState *estate;
RangeTblEntry *rte;
+ ResultRelInfo *resultRelInfo;
/*
* Input functions may need an active snapshot, as may AFTER triggers
- * invoked during finish_estate. For safety, ensure an active snapshot
+ * invoked during finish_edata. For safety, ensure an active snapshot
* exists throughout all our usage of the executor.
*/
PushActiveSnapshot(GetTransactionSnapshot());
- estate = CreateExecutorState();
+ edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
+ edata->targetRel = rel;
+
+ edata->estate = estate = CreateExecutorState();
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->rellockmode = AccessShareLock;
ExecInitRangeTable(estate, list_make1(rte));
- *resultRelInfo = makeNode(ResultRelInfo);
+ edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
/*
* Use Relation opened by logicalrep_rel_open() instead of opening it
* again.
*/
- InitResultRelInfo(*resultRelInfo, rel->localrel, 1, NULL, 0);
+ InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
/*
* We put the ResultRelInfo in the es_opened_result_relations list, even
* though we don't populate the es_result_relations array. That's a bit
* bogus, but it's enough to make ExecGetTriggerResultRel() find them.
- * Also, because we did not open the Relation ourselves here, there is no
- * need to worry about closing it.
*
* ExecOpenIndices() is not called here either, each execution path doing
* an apply operation being responsible for that.
*/
estate->es_opened_result_relations =
- lappend(estate->es_opened_result_relations, *resultRelInfo);
+ lappend(estate->es_opened_result_relations, resultRelInfo);
estate->es_output_cid = GetCurrentCommandId(true);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
- return estate;
+ /* other fields of edata remain NULL for now */
+
+ return edata;
}
/*
* Finish any operations related to the executor state created by
- * create_estate_for_relation().
+ * create_edata_for_relation().
*/
static void
-finish_estate(EState *estate)
+finish_edata(ApplyExecutionData *edata)
{
+ EState *estate = edata->estate;
+
/* Handle any queued AFTER triggers. */
AfterTriggerEndQuery(estate);
- /* Cleanup. */
+ /* Shut down tuple routing, if any was done. */
+ if (edata->proute)
+ ExecCleanupTupleRouting(edata->mtstate, edata->proute);
+
+ /*
+ * Cleanup. It might seem that we should call ExecCloseResultRelations()
+ * here, but we intentionally don't. It would close the rel we added to
+ * es_opened_result_relations above, which is wrong because we took no
+ * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
+ * any other relations opened during execution.
+ */
ExecResetTupleTable(estate->es_tupleTable, false);
FreeExecutorState(estate);
+ pfree(edata);
+
PopActiveSnapshot();
}
static void
apply_handle_insert(StringInfo s)
{
- ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepTupleData newtup;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
}
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel, &resultRelInfo);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(resultRelInfo, estate,
- remoteslot, NULL, rel, CMD_INSERT);
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_INSERT);
else
- apply_handle_insert_internal(resultRelInfo, estate,
+ apply_handle_insert_internal(edata, edata->targetRelInfo,
remoteslot);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
-/* Workhorse for apply_handle_insert() */
+/*
+ * Workhorse for apply_handle_insert()
+ * relinfo is for the relation we're actually inserting into
+ * (could be a child partition of edata->targetRelInfo)
+ */
static void
-apply_handle_insert_internal(ResultRelInfo *relinfo,
- EState *estate, TupleTableSlot *remoteslot)
+apply_handle_insert_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot)
{
+ EState *estate = edata->estate;
+
+ /* We must open indexes here. */
ExecOpenIndices(relinfo, false);
/* Do the insert. */
static void
apply_handle_update(StringInfo s)
{
- ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
LogicalRepTupleData oldtup;
LogicalRepTupleData newtup;
check_relation_updatable(rel);
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel, &resultRelInfo);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
/* For a partitioned table, apply update to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(resultRelInfo, estate,
- remoteslot, &newtup, rel, CMD_UPDATE);
+ apply_handle_tuple_routing(edata,
+ remoteslot, &newtup, CMD_UPDATE);
else
- apply_handle_update_internal(resultRelInfo, estate,
- remoteslot, &newtup, rel);
+ apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
-/* Workhorse for apply_handle_update() */
+/*
+ * Workhorse for apply_handle_update()
+ * relinfo is for the relation we're actually updating in
+ * (could be a child partition of edata->targetRelInfo)
+ */
static void
-apply_handle_update_internal(ResultRelInfo *relinfo,
- EState *estate, TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry)
+apply_handle_update_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup)
{
+ EState *estate = edata->estate;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
TupleTableSlot *localslot;
static void
apply_handle_delete(StringInfo s)
{
- ResultRelInfo *resultRelInfo;
LogicalRepRelMapEntry *rel;
LogicalRepTupleData oldtup;
LogicalRepRelId relid;
+ ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
check_relation_updatable(rel);
/* Initialize the executor state. */
- estate = create_estate_for_relation(rel, &resultRelInfo);
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
remoteslot = ExecInitExtraTupleSlot(estate,
RelationGetDescr(rel->localrel),
&TTSOpsVirtual);
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(resultRelInfo, estate,
- remoteslot, NULL, rel, CMD_DELETE);
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_DELETE);
else
- apply_handle_delete_internal(resultRelInfo, estate,
- remoteslot, &rel->remoterel);
+ apply_handle_delete_internal(edata, edata->targetRelInfo,
+ remoteslot);
- finish_estate(estate);
+ finish_edata(edata);
logicalrep_rel_close(rel, NoLock);
CommandCounterIncrement();
}
-/* Workhorse for apply_handle_delete() */
+/*
+ * Workhorse for apply_handle_delete()
+ * relinfo is for the relation we're actually deleting from
+ * (could be a child partition of edata->targetRelInfo)
+ */
static void
-apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
- TupleTableSlot *remoteslot,
- LogicalRepRelation *remoterel)
+apply_handle_delete_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot)
{
+ EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
+ LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
* This handles insert, update, delete on a partitioned table.
*/
static void
-apply_handle_tuple_routing(ResultRelInfo *relinfo,
- EState *estate,
+apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- LogicalRepRelMapEntry *relmapentry,
CmdType operation)
{
+ EState *estate = edata->estate;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ ResultRelInfo *relinfo = edata->targetRelInfo;
Relation parentrel = relinfo->ri_RelationDesc;
- ModifyTableState *mtstate = NULL;
- PartitionTupleRouting *proute = NULL;
+ ModifyTableState *mtstate;
+ PartitionTupleRouting *proute;
ResultRelInfo *partrelinfo;
Relation partrel;
TupleTableSlot *remoteslot_part;
MemoryContext oldctx;
/* ModifyTableState is needed for ExecFindPartition(). */
- mtstate = makeNode(ModifyTableState);
+ edata->mtstate = mtstate = makeNode(ModifyTableState);
mtstate->ps.plan = NULL;
mtstate->ps.state = estate;
mtstate->operation = operation;
mtstate->resultRelInfo = relinfo;
- proute = ExecSetupPartitionTupleRouting(estate, parentrel);
+
+ /* ... as is PartitionTupleRouting. */
+ edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
/*
* Find the partition to which the "search tuple" belongs.
switch (operation)
{
case CMD_INSERT:
- apply_handle_insert_internal(partrelinfo, estate,
+ apply_handle_insert_internal(edata, partrelinfo,
remoteslot_part);
break;
case CMD_DELETE:
- apply_handle_delete_internal(partrelinfo, estate,
- remoteslot_part,
- &relmapentry->remoterel);
+ apply_handle_delete_internal(edata, partrelinfo,
+ remoteslot_part);
break;
case CMD_UPDATE:
Assert(partrelinfo_new != partrelinfo);
/* DELETE old tuple found in the old partition. */
- apply_handle_delete_internal(partrelinfo, estate,
- localslot,
- &relmapentry->remoterel);
+ apply_handle_delete_internal(edata, partrelinfo,
+ localslot);
/* INSERT new tuple into the new partition. */
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
- apply_handle_insert_internal(partrelinfo_new, estate,
+ apply_handle_insert_internal(edata, partrelinfo_new,
remoteslot_part);
}
}
elog(ERROR, "unrecognized CmdType: %d", (int) operation);
break;
}
-
- ExecCleanupTupleRouting(mtstate, proute);
}
/*