Revert "Rewrite some RI code to avoid using SPI"
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 21:42:13 +0000 (23:42 +0200)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 21:42:13 +0000 (23:42 +0200)
This reverts commit 99392cdd78b788295e52b9f4942fa11992fd5ba9.
We'd rather rewrite ri_triggers.c as a whole rather than piecemeal.

Discussion: https://postgr.es/m/E1ncXX2-000mFt-Pe@gemulon.postgresql.org

src/backend/executor/execPartition.c
src/backend/executor/nodeLockRows.c
src/backend/utils/adt/ri_triggers.c
src/include/executor/execPartition.h
src/include/executor/executor.h
src/test/isolation/expected/fk-snapshot.out
src/test/isolation/specs/fk-snapshot.spec

index c22c9ac0966528152728f89dc5dbf137ad941afa..615bd80973539691f6248298fb4a102b92f21ccc 100644 (file)
@@ -176,9 +176,8 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
                                                                  EState *estate,
                                                                  Datum *values,
                                                                  bool *isnull);
-static int     get_partition_for_tuple(PartitionKey key,
-                                                                       PartitionDesc partdesc,
-                                                                       Datum *values, bool *isnull);
+static int     get_partition_for_tuple(PartitionDispatch pd, Datum *values,
+                                                                       bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
                                                                                                  Datum *values,
                                                                                                  bool *isnull,
@@ -319,9 +318,7 @@ ExecFindPartition(ModifyTableState *mtstate,
                 * these values, error out.
                 */
                if (partdesc->nparts == 0 ||
-                       (partidx = get_partition_for_tuple(dispatch->key,
-                                                                                          dispatch->partdesc,
-                                                                                          values, isnull)) < 0)
+                       (partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
                {
                        char       *val_desc;
 
@@ -1344,12 +1341,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionKey key,
-                                               PartitionDesc partdesc,
-                                               Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
 {
        int                     bound_offset;
        int                     part_index = -1;
+       PartitionKey key = pd->key;
+       PartitionDesc partdesc = pd->partdesc;
        PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
        /* Route as appropriate based on partitioning strategy. */
@@ -1441,165 +1438,6 @@ get_partition_for_tuple(PartitionKey key,
        return part_index;
 }
 
-/*
- * ExecGetLeafPartitionForKey
- *             Finds the leaf partition of partitioned table 'root_rel' that would
- *             contain the specified key tuple.
- *
- * A subset of the table's columns (including all of the partition key columns)
- * must be specified:
- * - 'key_natts' indicats the number of columns contained in the key
- * - 'key_attnums' indicates their attribute numbers as defined in 'root_rel'
- * - 'key_vals' and 'key_nulls' specify the key tuple
- *
- * Returns the leaf partition, locked with the given lockmode, or NULL if
- * there isn't one.  Caller is responsibly for closing it.  All intermediate
- * partitions are also locked with the same lockmode.  Caller must have locked
- * the root already.
- *
- * In addition, the OID of the index of a unique constraint on the root table
- * must be given as 'root_idxoid'; *leaf_idxoid will be set to the OID of the
- * corresponding index on the returned leaf partition.  (This can be used by
- * caller to search for a tuple matching the key in the leaf partition.)
- *
- * This works because the unique key defined on the root relation is required
- * to contain the partition key columns of all of the ancestors that lead up to
- * a given leaf partition.
- */
-Relation
-ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
-                                                  const AttrNumber *key_attnums,
-                                                  Datum *key_vals, char *key_nulls,
-                                                  Oid root_idxoid, int lockmode,
-                                                  Oid *leaf_idxoid)
-{
-       Relation        found_leafpart = NULL;
-       Relation        rel = root_rel;
-       Oid                     constr_idxoid = root_idxoid;
-       PartitionDirectory partdir;
-
-       Assert(root_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
-
-       *leaf_idxoid = InvalidOid;
-
-       partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
-
-       /*
-        * Descend through partitioned parents to find the leaf partition that
-        * would accept a row with the provided key values, starting with the root
-        * parent.
-        */
-       for (;;)
-       {
-               PartitionKey partkey = RelationGetPartitionKey(rel);
-               PartitionDesc partdesc;
-               Datum           partkey_vals[PARTITION_MAX_KEYS];
-               bool            partkey_isnull[PARTITION_MAX_KEYS];
-               AttrNumber *root_partattrs = partkey->partattrs;
-               int                     found_att;
-               int                     partidx;
-               Oid                     partoid;
-
-               CHECK_FOR_INTERRUPTS();
-
-               /*
-                * Collect partition key values from the unique key.
-                *
-                * Because we only have the root table's copy of pk_attnums, must map
-                * any non-root table's partition key attribute numbers to the root
-                * table's.
-                */
-               if (rel != root_rel)
-               {
-                       /*
-                        * map->attnums will contain root table attribute numbers for each
-                        * attribute of the current partitioned relation.
-                        */
-                       AttrMap    *map;
-
-                       map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
-                                                                                          RelationGetDescr(rel));
-                       if (map)
-                       {
-                               root_partattrs = palloc(partkey->partnatts *
-                                                                               sizeof(AttrNumber));
-                               for (int att = 0; att < partkey->partnatts; att++)
-                               {
-                                       AttrNumber      partattno = partkey->partattrs[att];
-
-                                       root_partattrs[att] = map->attnums[partattno - 1];
-                               }
-
-                               free_attrmap(map);
-                       }
-               }
-
-               /*
-                * Map the values/isnulls to match the partition description, as
-                * necessary.
-                *
-                * (Referenced key specification does not allow expressions, so there
-                * would not be expressions in the partition keys either.)
-                */
-               Assert(partkey->partexprs == NIL);
-               found_att = 0;
-               for (int keyatt = 0; keyatt < key_natts; keyatt++)
-               {
-                       for (int att = 0; att < partkey->partnatts; att++)
-                       {
-                               if (root_partattrs[att] == key_attnums[keyatt])
-                               {
-                                       partkey_vals[found_att] = key_vals[keyatt];
-                                       partkey_isnull[found_att] = (key_nulls[keyatt] == 'n');
-                                       found_att++;
-                                       break;
-                               }
-                       }
-               }
-               /* We had better have found values for all partition keys */
-               Assert(found_att == partkey->partnatts);
-
-               if (root_partattrs != partkey->partattrs)
-                       pfree(root_partattrs);
-
-               /* Get the PartitionDesc using the partition directory machinery.  */
-               partdesc = PartitionDirectoryLookup(partdir, rel);
-               if (partdesc->nparts == 0)
-                       break;
-
-               /* Find the partition for the key. */
-               partidx = get_partition_for_tuple(partkey, partdesc,
-                                                                                 partkey_vals, partkey_isnull);
-               Assert(partidx < 0 || partidx < partdesc->nparts);
-
-               /* close the previous parent if any, but keep lock */
-               if (rel != root_rel)
-                       table_close(rel, NoLock);
-
-               /* No partition found. */
-               if (partidx < 0)
-                       break;
-
-               partoid = partdesc->oids[partidx];
-               rel = table_open(partoid, lockmode);
-               constr_idxoid = index_get_partition(rel, constr_idxoid);
-
-               /*
-                * We're done if the partition is a leaf, else find its partition in
-                * the next iteration.
-                */
-               if (partdesc->is_leaf[partidx])
-               {
-                       *leaf_idxoid = constr_idxoid;
-                       found_leafpart = rel;
-                       break;
-               }
-       }
-
-       DestroyPartitionDirectory(partdir);
-       return found_leafpart;
-}
-
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
index bbccafb2cfd0a1a8d9ac16eb3a5309717fe71f18..1a9dab25dd6add51d171ef567d574310f613474c 100644 (file)
@@ -79,7 +79,10 @@ lnext:
                Datum           datum;
                bool            isNull;
                ItemPointerData tid;
+               TM_FailureData tmfd;
                LockTupleMode lockmode;
+               int                     lockflags = 0;
+               TM_Result       test;
                TupleTableSlot *markSlot;
 
                /* clear any leftover test tuple for this rel */
@@ -176,11 +179,74 @@ lnext:
                                break;
                }
 
-               /* skip tuple if it couldn't be locked */
-               if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
-                                                               estate->es_snapshot, estate->es_output_cid,
-                                                               lockmode, erm->waitPolicy, &epq_needed))
-                       goto lnext;
+               lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+               if (!IsolationUsesXactSnapshot())
+                       lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+               test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
+                                                               markSlot, estate->es_output_cid,
+                                                               lockmode, erm->waitPolicy,
+                                                               lockflags,
+                                                               &tmfd);
+
+               switch (test)
+               {
+                       case TM_WouldBlock:
+                               /* couldn't lock tuple in SKIP LOCKED mode */
+                               goto lnext;
+
+                       case TM_SelfModified:
+
+                               /*
+                                * The target tuple was already updated or deleted by the
+                                * current command, or by a later command in the current
+                                * transaction.  We *must* ignore the tuple in the former
+                                * case, so as to avoid the "Halloween problem" of repeated
+                                * update attempts.  In the latter case it might be sensible
+                                * to fetch the updated tuple instead, but doing so would
+                                * require changing heap_update and heap_delete to not
+                                * complain about updating "invisible" tuples, which seems
+                                * pretty scary (table_tuple_lock will not complain, but few
+                                * callers expect TM_Invisible, and we're not one of them). So
+                                * for now, treat the tuple as deleted and do not process.
+                                */
+                               goto lnext;
+
+                       case TM_Ok:
+
+                               /*
+                                * Got the lock successfully, the locked tuple saved in
+                                * markSlot for, if needed, EvalPlanQual testing below.
+                                */
+                               if (tmfd.traversed)
+                                       epq_needed = true;
+                               break;
+
+                       case TM_Updated:
+                               if (IsolationUsesXactSnapshot())
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("could not serialize access due to concurrent update")));
+                               elog(ERROR, "unexpected table_tuple_lock status: %u",
+                                        test);
+                               break;
+
+                       case TM_Deleted:
+                               if (IsolationUsesXactSnapshot())
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("could not serialize access due to concurrent update")));
+                               /* tuple was deleted so don't return it */
+                               goto lnext;
+
+                       case TM_Invisible:
+                               elog(ERROR, "attempted to lock invisible tuple");
+                               break;
+
+                       default:
+                               elog(ERROR, "unrecognized table_tuple_lock status: %u",
+                                        test);
+               }
 
                /* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
                erm->curCtid = tid;
@@ -215,91 +281,6 @@ lnext:
        return slot;
 }
 
-/*
- * ExecLockTableTuple
- *             Locks tuple with the specified TID in lockmode following given wait
- *             policy
- *
- * Returns true if the tuple was successfully locked.  Locked tuple is loaded
- * into provided slot.
- */
-bool
-ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
-                                  Snapshot snapshot, CommandId cid,
-                                  LockTupleMode lockmode, LockWaitPolicy waitPolicy,
-                                  bool *epq_needed)
-{
-       TM_FailureData tmfd;
-       int                     lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-       TM_Result       test;
-
-       if (!IsolationUsesXactSnapshot())
-               lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-       test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
-                                                       waitPolicy, lockflags, &tmfd);
-
-       switch (test)
-       {
-               case TM_WouldBlock:
-                       /* couldn't lock tuple in SKIP LOCKED mode */
-                       return false;
-
-               case TM_SelfModified:
-
-                       /*
-                        * The target tuple was already updated or deleted by the current
-                        * command, or by a later command in the current transaction.  We
-                        * *must* ignore the tuple in the former case, so as to avoid the
-                        * "Halloween problem" of repeated update attempts.  In the latter
-                        * case it might be sensible to fetch the updated tuple instead,
-                        * but doing so would require changing heap_update and heap_delete
-                        * to not complain about updating "invisible" tuples, which seems
-                        * pretty scary (table_tuple_lock will not complain, but few
-                        * callers expect TM_Invisible, and we're not one of them). So for
-                        * now, treat the tuple as deleted and do not process.
-                        */
-                       return false;
-
-               case TM_Ok:
-
-                       /*
-                        * Got the lock successfully, the locked tuple saved in slot for
-                        * EvalPlanQual, if asked by the caller.
-                        */
-                       if (tmfd.traversed && epq_needed)
-                               *epq_needed = true;
-                       break;
-
-               case TM_Updated:
-                       if (IsolationUsesXactSnapshot())
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("could not serialize access due to concurrent update")));
-                       elog(ERROR, "unexpected table_tuple_lock status: %u",
-                                test);
-                       break;
-
-               case TM_Deleted:
-                       if (IsolationUsesXactSnapshot())
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("could not serialize access due to concurrent update")));
-                       /* tuple was deleted so don't return it */
-                       return false;
-
-               case TM_Invisible:
-                       elog(ERROR, "attempted to lock invisible tuple");
-                       return false;
-
-               default:
-                       elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
-                       return false;
-       }
-
-       return true;
-}
-
 /* ----------------------------------------------------------------
  *             ExecInitLockRows
  *
index 088b4027008d045c8072069bd830d7f7dccb8f1f..01d4c22cfce1850473f91b867e5a155d202b64aa 100644 (file)
@@ -23,7 +23,6 @@
 
 #include "postgres.h"
 
-#include "access/genam.h"
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/table.h"
@@ -34,7 +33,6 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
-#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #define RI_KEYS_NONE_NULL                              2
 
 /* RI query type codes */
-#define RI_PLAN_CASCADE_ONDELETE               1
-#define RI_PLAN_CASCADE_ONUPDATE               2
+/* these queries are executed against the PK (referenced) table: */
+#define RI_PLAN_CHECK_LOOKUPPK                 1
+#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
+#define RI_PLAN_LAST_ON_PK                             RI_PLAN_CHECK_LOOKUPPK_FROM_PK
+/* these queries are executed against the FK (referencing) table: */
+#define RI_PLAN_CASCADE_ONDELETE               3
+#define RI_PLAN_CASCADE_ONUPDATE               4
 /* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */
-#define RI_PLAN_RESTRICT                               3
-#define RI_PLAN_SETNULL_ONDELETE               4
-#define RI_PLAN_SETNULL_ONUPDATE               5
-#define RI_PLAN_SETDEFAULT_ONDELETE            6
-#define RI_PLAN_SETDEFAULT_ONUPDATE            7
+#define RI_PLAN_RESTRICT                               5
+#define RI_PLAN_SETNULL_ONDELETE               6
+#define RI_PLAN_SETNULL_ONUPDATE               7
+#define RI_PLAN_SETDEFAULT_ONDELETE            8
+#define RI_PLAN_SETDEFAULT_ONUPDATE            9
 
 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
@@ -226,278 +229,9 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                                           Relation pk_rel, Relation fk_rel,
                                                           TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                                                          bool on_fk, bool partgone) pg_attribute_noreturn();
-static Oid     get_fkey_unique_index(Oid conoid);
+                                                          int queryno, bool partgone) pg_attribute_noreturn();
 
 
-/*
- * Checks whether a tuple containing the unique key as extracted from the
- * tuple provided in 'slot' exists in 'pk_rel'.  The key is extracted using the
- * constraint's index given in 'riinfo', which is also scanned to check the
- * existence of the key.
- *
- * If 'pk_rel' is a partitioned table, the check is performed on its leaf
- * partition that would contain the key.
- *
- * The provided tuple is either the one being inserted into the referencing
- * relation ('fk_rel' is non-NULL), or the one being deleted from the
- * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
- */
-static bool
-ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
-                                          TupleTableSlot *slot, const RI_ConstraintInfo *riinfo)
-{
-       Oid                     constr_id = riinfo->constraint_id;
-       Oid                     idxoid;
-       Relation        idxrel;
-       Relation        leaf_pk_rel = NULL;
-       int                     num_pk;
-       int                     i;
-       bool            found = false;
-       const Oid  *eq_oprs;
-       Datum           pk_vals[INDEX_MAX_KEYS];
-       char            pk_nulls[INDEX_MAX_KEYS];
-       ScanKeyData skey[INDEX_MAX_KEYS];
-       Snapshot        snap = InvalidSnapshot;
-       bool            pushed_latest_snapshot = false;
-       IndexScanDesc scan;
-       TupleTableSlot *outslot;
-       Oid                     saved_userid;
-       int                     saved_sec_context;
-       AclResult       aclresult;
-
-       /*
-        * Extract the unique key from the provided slot and choose the equality
-        * operators to use when scanning the index below.
-        */
-       if (fk_rel)
-       {
-               ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
-               /* Use PK = FK equality operator. */
-               eq_oprs = riinfo->pf_eq_oprs;
-
-               /*
-                * May need to cast each of the individual values of the foreign key
-                * to the corresponding PK column's type if the equality operator
-                * demands it.
-                */
-               for (i = 0; i < riinfo->nkeys; i++)
-               {
-                       if (pk_nulls[i] != 'n')
-                       {
-                               Oid                     eq_opr = eq_oprs[i];
-                               Oid                     typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-                               RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
-
-                               if (OidIsValid(entry->cast_func_finfo.fn_oid))
-                                       pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
-                                                                                          pk_vals[i],
-                                                                                          Int32GetDatum(-1),   /* typmod */
-                                                                                          BoolGetDatum(false));        /* implicit coercion */
-                       }
-               }
-       }
-       else
-       {
-               ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
-               /* Use PK = PK equality operator. */
-               eq_oprs = riinfo->pp_eq_oprs;
-       }
-
-       /*
-        * Switch to referenced table's owner to perform the below operations as.
-        * This matches what ri_PerformCheck() does.
-        *
-        * Note that as with queries done by ri_PerformCheck(), the way we select
-        * the referenced row below effectively bypasses any RLS policies that may
-        * be present on the referenced table.
-        */
-       GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
-       SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
-                                                  saved_sec_context | SECURITY_LOCAL_USERID_CHANGE);
-
-       /*
-        * Also check that the new user has permissions to look into the schema of
-        * and SELECT from the referenced table.
-        */
-       aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
-                                                                         GetUserId(), ACL_USAGE);
-       if (aclresult != ACLCHECK_OK)
-               aclcheck_error(aclresult, OBJECT_SCHEMA,
-                                          get_namespace_name(RelationGetNamespace(pk_rel)));
-       aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
-                                                                 ACL_SELECT);
-       if (aclresult != ACLCHECK_OK)
-               aclcheck_error(aclresult, OBJECT_TABLE,
-                                          RelationGetRelationName(pk_rel));
-
-       /* Make the changes of the current command visible in all cases. */
-       CommandCounterIncrement();
-
-       /*
-        * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like
-        * to see all rows that could be interesting, even those that would not be
-        * visible to the transaction snapshot.  To do so, force-push the latest
-        * snapshot.
-        */
-       if (fk_rel == NULL)
-       {
-               snap = GetLatestSnapshot();
-               PushActiveSnapshot(snap);
-               pushed_latest_snapshot = true;
-       }
-       else
-       {
-               snap = GetTransactionSnapshot();
-               PushActiveSnapshot(snap);
-       }
-
-       /*
-        * Open the constraint index to be scanned.
-        *
-        * If the target table is partitioned, we must look up the leaf partition
-        * and its corresponding unique index to search the keys in.
-        */
-       idxoid = get_fkey_unique_index(constr_id);
-       if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-       {
-               Oid                     leaf_idxoid;
-               Snapshot        mysnap = InvalidSnapshot;
-
-               /*
-                * XXX the partition descriptor machinery has a hack that assumes that
-                * the queries originating in this module push the latest snapshot in
-                * the transaction-snapshot mode.  If we haven't pushed one already,
-                * do so now.
-                */
-               if (!pushed_latest_snapshot)
-               {
-                       mysnap = GetLatestSnapshot();
-                       PushActiveSnapshot(mysnap);
-               }
-
-               leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
-                                                                                                riinfo->pk_attnums,
-                                                                                                pk_vals, pk_nulls,
-                                                                                                idxoid, RowShareLock,
-                                                                                                &leaf_idxoid);
-
-               /*
-                * XXX done fiddling with the partition descriptor machinery so unset
-                * the active snapshot if we must.
-                */
-               if (mysnap != InvalidSnapshot)
-                       PopActiveSnapshot();
-
-               /*
-                * If no suitable leaf partition exists, neither can the key we're
-                * looking for.
-                */
-               if (leaf_pk_rel == NULL)
-               {
-                       SetUserIdAndSecContext(saved_userid, saved_sec_context);
-                       PopActiveSnapshot();
-                       return false;
-               }
-
-               pk_rel = leaf_pk_rel;
-               idxoid = leaf_idxoid;
-       }
-       idxrel = index_open(idxoid, RowShareLock);
-
-       /* Set up ScanKeys for the index scan. */
-       num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
-       for (i = 0; i < num_pk; i++)
-       {
-               int                     pkattno = i + 1;
-               Oid                     operator = eq_oprs[i];
-               Oid                     opfamily = idxrel->rd_opfamily[i];
-               StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
-               RegProcedure regop = get_opcode(operator);
-
-               /* Initialize the scankey. */
-               ScanKeyInit(&skey[i],
-                                       pkattno,
-                                       strat,
-                                       regop,
-                                       pk_vals[i]);
-
-               skey[i].sk_collation = idxrel->rd_indcollation[i];
-
-               /*
-                * Check for null value.  Should not occur, because callers currently
-                * take care of the cases in which they do occur.
-                */
-               if (pk_nulls[i] == 'n')
-                       skey[i].sk_flags |= SK_ISNULL;
-       }
-
-       scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0);
-       index_rescan(scan, skey, num_pk, NULL, 0);
-
-       /* Look for the tuple, and if found, try to lock it in key share mode. */
-       outslot = table_slot_create(pk_rel, NULL);
-       if (index_getnext_slot(scan, ForwardScanDirection, outslot))
-       {
-               /*
-                * If we fail to lock the tuple for whatever reason, assume it doesn't
-                * exist.
-                */
-               found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
-                                                                  snap,
-                                                                  GetCurrentCommandId(false),
-                                                                  LockTupleKeyShare,
-                                                                  LockWaitBlock, NULL);
-       }
-
-       index_endscan(scan);
-       ExecDropSingleTupleTableSlot(outslot);
-
-       /* Don't release lock until commit. */
-       index_close(idxrel, NoLock);
-
-       /* Close leaf partition relation if any. */
-       if (leaf_pk_rel)
-               table_close(leaf_pk_rel, NoLock);
-
-       /* Restore UID and security context */
-       SetUserIdAndSecContext(saved_userid, saved_sec_context);
-
-       PopActiveSnapshot();
-
-       return found;
-}
-
-/*
- * get_fkey_unique_index
- *             Returns the unique index used by a supposedly foreign key constraint
- *
- * XXX This is very similar to get_constraint_index; probably they should be
- * unified.
- */
-static Oid
-get_fkey_unique_index(Oid conoid)
-{
-       Oid                     result = InvalidOid;
-       HeapTuple       tp;
-
-       tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
-       if (HeapTupleIsValid(tp))
-       {
-               Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
-
-               if (contup->contype == CONSTRAINT_FOREIGN)
-                       result = contup->conindid;
-               ReleaseSysCache(tp);
-       }
-
-       if (!OidIsValid(result))
-               elog(ERROR, "unique index not found for foreign key constraint %u",
-                        conoid);
-
-       return result;
-}
-
 /*
  * RI_FKey_check -
  *
@@ -510,6 +244,8 @@ RI_FKey_check(TriggerData *trigdata)
        Relation        fk_rel;
        Relation        pk_rel;
        TupleTableSlot *newslot;
+       RI_QueryKey qkey;
+       SPIPlanPtr      qplan;
 
        riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
                                                                        trigdata->tg_relation, false);
@@ -589,9 +325,9 @@ RI_FKey_check(TriggerData *trigdata)
 
                                        /*
                                         * MATCH PARTIAL - all non-null columns must match. (not
-                                        * implemented, can be done by modifying
-                                        * ri_ReferencedKeyExists() to only include non-null
-                                        * columns.
+                                        * implemented, can be done by modifying the query below
+                                        * to only include non-null columns, or by writing a
+                                        * special version here)
                                         */
                                        break;
 #endif
@@ -606,12 +342,74 @@ RI_FKey_check(TriggerData *trigdata)
                        break;
        }
 
-       if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
-               ri_ReportViolation(riinfo,
-                                                  pk_rel, fk_rel,
-                                                  newslot,
-                                                  NULL,
-                                                  true, false);
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "SPI_connect failed");
+
+       /* Fetch or prepare a saved plan for the real check */
+       ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
+
+       if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+       {
+               StringInfoData querybuf;
+               char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+               char            attname[MAX_QUOTED_NAME_LEN];
+               char            paramname[16];
+               const char *querysep;
+               Oid                     queryoids[RI_MAX_NUMKEYS];
+               const char *pk_only;
+
+               /* ----------
+                * The query string built is
+                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+                *                 FOR KEY SHARE OF x
+                * The type id's for the $ parameters are those of the
+                * corresponding FK attributes.
+                * ----------
+                */
+               initStringInfo(&querybuf);
+               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                       "" : "ONLY ";
+               quoteRelationName(pkrelname, pk_rel);
+               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                                                pk_only, pkrelname);
+               querysep = "WHERE";
+               for (int i = 0; i < riinfo->nkeys; i++)
+               {
+                       Oid                     pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+                       Oid                     fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+
+                       quoteOneName(attname,
+                                                RIAttName(pk_rel, riinfo->pk_attnums[i]));
+                       sprintf(paramname, "$%d", i + 1);
+                       ri_GenerateQual(&querybuf, querysep,
+                                                       attname, pk_type,
+                                                       riinfo->pf_eq_oprs[i],
+                                                       paramname, fk_type);
+                       querysep = "AND";
+                       queryoids[i] = fk_type;
+               }
+               appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+               /* Prepare and save the plan */
+               qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+                                                        &qkey, fk_rel, pk_rel);
+       }
+
+       /*
+        * Now check that foreign key exists in PK table
+        *
+        * XXX detectNewRows must be true when a partitioned table is on the
+        * referenced side.  The reason is that our snapshot must be fresh in
+        * order for the hack in find_inheritance_children() to work.
+        */
+       ri_PerformCheck(riinfo, &qkey, qplan,
+                                       fk_rel, pk_rel,
+                                       NULL, newslot,
+                                       pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
+                                       SPI_OK_SELECT);
+
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
 
        table_close(pk_rel, RowShareLock);
 
@@ -666,10 +464,81 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
                                  TupleTableSlot *oldslot,
                                  const RI_ConstraintInfo *riinfo)
 {
+       SPIPlanPtr      qplan;
+       RI_QueryKey qkey;
+       bool            result;
+
        /* Only called for non-null rows */
        Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-       return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
+       if (SPI_connect() != SPI_OK_CONNECT)
+               elog(ERROR, "SPI_connect failed");
+
+       /*
+        * Fetch or prepare a saved plan for checking PK table with values coming
+        * from a PK row
+        */
+       ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
+
+       if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+       {
+               StringInfoData querybuf;
+               char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
+               char            attname[MAX_QUOTED_NAME_LEN];
+               char            paramname[16];
+               const char *querysep;
+               const char *pk_only;
+               Oid                     queryoids[RI_MAX_NUMKEYS];
+
+               /* ----------
+                * The query string built is
+                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+                *                 FOR KEY SHARE OF x
+                * The type id's for the $ parameters are those of the
+                * PK attributes themselves.
+                * ----------
+                */
+               initStringInfo(&querybuf);
+               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+                       "" : "ONLY ";
+               quoteRelationName(pkrelname, pk_rel);
+               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                                                pk_only, pkrelname);
+               querysep = "WHERE";
+               for (int i = 0; i < riinfo->nkeys; i++)
+               {
+                       Oid                     pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+
+                       quoteOneName(attname,
+                                                RIAttName(pk_rel, riinfo->pk_attnums[i]));
+                       sprintf(paramname, "$%d", i + 1);
+                       ri_GenerateQual(&querybuf, querysep,
+                                                       attname, pk_type,
+                                                       riinfo->pp_eq_oprs[i],
+                                                       paramname, pk_type);
+                       querysep = "AND";
+                       queryoids[i] = pk_type;
+               }
+               appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+               /* Prepare and save the plan */
+               qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+                                                        &qkey, fk_rel, pk_rel);
+       }
+
+       /*
+        * We have a plan now. Run it.
+        */
+       result = ri_PerformCheck(riinfo, &qkey, qplan,
+                                                        fk_rel, pk_rel,
+                                                        oldslot, NULL,
+                                                        true,  /* treat like update */
+                                                        SPI_OK_SELECT);
+
+       if (SPI_finish() != SPI_OK_FINISH)
+               elog(ERROR, "SPI_finish failed");
+
+       return result;
 }
 
 
@@ -1739,10 +1608,15 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                                         errtableconstraint(fk_rel,
                                                                                NameStr(fake_riinfo.conname))));
 
+               /*
+                * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
+                * query, which isn't true, but will cause it to use
+                * fake_riinfo.fk_attnums as we need.
+                */
                ri_ReportViolation(&fake_riinfo,
                                                   pk_rel, fk_rel,
                                                   slot, tupdesc,
-                                                  true, false);
+                                                  RI_PLAN_CHECK_LOOKUPPK, false);
 
                ExecDropSingleTupleTableSlot(slot);
        }
@@ -1959,7 +1833,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                        fake_riinfo.pk_attnums[i] = i + 1;
 
                ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
-                                                  slot, tupdesc, true, true);
+                                                  slot, tupdesc, 0, true);
        }
 
        if (SPI_finish() != SPI_OK_FINISH)
@@ -2096,8 +1970,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 {
        /*
         * Inherited constraints with a common ancestor can share ri_query_cache
-        * entries, because each query processes the other table involved in the
-        * FK constraint (i.e., not the table on which the trigger has been
+        * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
+        * Except in that case, the query processes the other table involved in
+        * the FK constraint (i.e., not the table on which the trigger has been
         * fired), and so it will be the same for all members of the inheritance
         * tree.  So we may use the root constraint's OID in the hash key, rather
         * than the constraint's own OID.  This avoids creating duplicate SPI
@@ -2108,13 +1983,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
         * constraint, because partitions can have different column orders,
         * resulting in different pk_attnums[] or fk_attnums[] array contents.)
         *
-        * (Note also that for a standalone or non-inherited constraint,
-        * constraint_root_id is same as constraint_id.)
-        *
         * We assume struct RI_QueryKey contains no padding bytes, else we'd need
         * to use memset to clear them.
         */
-       key->constr_id = riinfo->constraint_root_id;
+       if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+               key->constr_id = riinfo->constraint_root_id;
+       else
+               key->constr_id = riinfo->constraint_id;
        key->constr_queryno = constr_queryno;
 }
 
@@ -2385,12 +2260,19 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
                         RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
        SPIPlanPtr      qplan;
-
-       /* There are currently no queries that run on PK table. */
-       Relation        query_rel = fk_rel;
+       Relation        query_rel;
        Oid                     save_userid;
        int                     save_sec_context;
 
+       /*
+        * Use the query type code to determine whether the query is run against
+        * the PK or FK table; we'll do the check as that table's owner
+        */
+       if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+               query_rel = pk_rel;
+       else
+               query_rel = fk_rel;
+
        /* Switch to proper UID to perform check as */
        GetUserIdAndSecContext(&save_userid, &save_sec_context);
        SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2423,9 +2305,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                                TupleTableSlot *oldslot, TupleTableSlot *newslot,
                                bool detectNewRows, int expect_OK)
 {
-       /* There are currently no queries that run on PK table. */
-       Relation        query_rel = fk_rel,
-                               source_rel = pk_rel;
+       Relation        query_rel,
+                               source_rel;
+       bool            source_is_pk;
        Snapshot        test_snapshot;
        Snapshot        crosscheck_snapshot;
        int                     limit;
@@ -2435,17 +2317,46 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
        Datum           vals[RI_MAX_NUMKEYS * 2];
        char            nulls[RI_MAX_NUMKEYS * 2];
 
+       /*
+        * Use the query type code to determine whether the query is run against
+        * the PK or FK table; we'll do the check as that table's owner
+        */
+       if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+               query_rel = pk_rel;
+       else
+               query_rel = fk_rel;
+
+       /*
+        * The values for the query are taken from the table on which the trigger
+        * is called - it is normally the other one with respect to query_rel. An
+        * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
+        * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
+        * need some less klugy way to determine this.
+        */
+       if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+       {
+               source_rel = fk_rel;
+               source_is_pk = false;
+       }
+       else
+       {
+               source_rel = pk_rel;
+               source_is_pk = true;
+       }
+
        /* Extract the parameters to be passed into the query */
        if (newslot)
        {
-               ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls);
+               ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
+                                                vals, nulls);
                if (oldslot)
-                       ri_ExtractValues(source_rel, oldslot, riinfo, true,
+                       ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
                                                         vals + riinfo->nkeys, nulls + riinfo->nkeys);
        }
        else
        {
-               ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls);
+               ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
+                                                vals, nulls);
        }
 
        /*
@@ -2509,12 +2420,14 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                                 errhint("This is most likely due to a rule having rewritten the query.")));
 
        /* XXX wouldn't it be clearer to do this part at the caller? */
-       if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
+       if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+               expect_OK == SPI_OK_SELECT &&
+               (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
                ri_ReportViolation(riinfo,
                                                   pk_rel, fk_rel,
                                                   newslot ? newslot : oldslot,
                                                   NULL,
-                                                  false, false);
+                                                  qkey->constr_queryno, false);
 
        return SPI_processed != 0;
 }
@@ -2545,9 +2458,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 /*
  * Produce an error report
  *
- * If the failed constraint was on insert/update to the FK table (on_fk is
- * true), we want the key names and values extracted from there, and the
- * error message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table,
+ * we want the key names and values extracted from there, and the error
+ * message to look like 'key blah is not present in PK'.
  * Otherwise, the attr names and values come from the PK table and the
  * message looks like 'key blah is still referenced from FK'.
  */
@@ -2555,20 +2468,22 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                   Relation pk_rel, Relation fk_rel,
                                   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                                  bool on_fk, bool partgone)
+                                  int queryno, bool partgone)
 {
        StringInfoData key_names;
        StringInfoData key_values;
+       bool            onfk;
        const int16 *attnums;
        Oid                     rel_oid;
        AclResult       aclresult;
        bool            has_perm = true;
 
        /*
-        * If tupdesc wasn't passed by caller, assume the violator tuple came from
-        * there.
+        * Determine which relation to complain about.  If tupdesc wasn't passed
+        * by caller, assume the violator tuple came from there.
         */
-       if (on_fk)
+       onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
+       if (onfk)
        {
                attnums = riinfo->fk_attnums;
                rel_oid = fk_rel->rd_id;
@@ -2670,7 +2585,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                                   key_names.data, key_values.data,
                                                   RelationGetRelationName(fk_rel)),
                                 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
-       else if (on_fk)
+       else if (onfk)
                ereport(ERROR,
                                (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
                                 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2977,10 +2892,7 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.  The entry contains the FmgrInfo of the equality operator function
- * and that of the cast function, if one is needed to convert the right
- * operand (whose type OID has been passed) before passing it to the equality
- * function.
+ * if not.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3036,16 +2948,8 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
                 * moment since that will never be generated for implicit coercions.
                 */
                op_input_types(eq_opr, &lefttype, &righttype);
-
-               /*
-                * Don't need to cast if the values that will be passed to the
-                * operator will be of expected operand type(s).  The operator can be
-                * cross-type (such as when called by ri_ReferencedKeyExists()), in
-                * which case, we only need the cast if the right operand value
-                * doesn't match the type expected by the operator.
-                */
-               if ((lefttype == righttype && typeid == lefttype) ||
-                       (lefttype != righttype && typeid == righttype))
+               Assert(lefttype == righttype);
+               if (typeid == lefttype)
                        castfunc = InvalidOid;  /* simplest case */
                else
                {
index cbe1d996e6b5200832a68b11400767620c4cf873..708435e95285194fbb7b9f407e2de11485d9ca56 100644 (file)
@@ -31,12 +31,6 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
                                                                                EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
                                                                        PartitionTupleRouting *proute);
-extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
-                                                                                  int key_natts,
-                                                                                  const AttrNumber *key_attnums,
-                                                                                  Datum *key_vals, char *key_nulls,
-                                                                                  Oid root_idxoid, int lockmode,
-                                                                                  Oid *leaf_idxoid);
 
 
 /*
index 216d28679a6e35b1220240c071f5a3ee8e924b9e..873772f188374b0de4eeddcb3850e4346abc2574 100644 (file)
@@ -651,14 +651,6 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
                                                                         const char *relname);
 
-/*
- * prototypes from functions in nodeLockRows.c
- */
-extern bool ExecLockTableTuple(Relation relation, ItemPointer tid,
-                                                          TupleTableSlot *slot, Snapshot snapshot,
-                                                          CommandId cid, LockTupleMode lockmode,
-                                                          LockWaitPolicy waitPolicy, bool *epq_needed);
-
 /*
  * prototypes from functions in nodeModifyTable.c
  */
index 22752cc7429014e9a0406dc7dd08004642af0cea..5faf80d6ce0e4cd292ba3788dbbd5ef4fd689447 100644 (file)
@@ -47,12 +47,12 @@ a
 
 step s2ifn2: INSERT INTO fk_noparted VALUES (2);
 step s2c: COMMIT;
-ERROR:  insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
 step s2sfn: SELECT * FROM fk_noparted;
 a
 -
 1
-(1 row)
+2
+(2 rows)
 
 
 starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
index 64d27f29c3c859f56042770e0cf1d44e5d12ceb2..378507fbc3822be4821eff7a2062526750c59f81 100644 (file)
@@ -46,7 +46,10 @@ step s2sfn   { SELECT * FROM fk_noparted; }
 # inserting into referencing tables in transaction-snapshot mode
 # PK table is non-partitioned
 permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned
+# PK table is partitioned: buggy, because s2's serialization transaction can
+# see the uncommitted row thanks to the latest snapshot taken for
+# partition lookup to work correctly also ends up getting used by the PK index
+# scan
 permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
 
 # inserting into referencing tables in up-to-date snapshot mode