Revert "Rewrite some RI code to avoid using SPI"
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 21:42:13 +0000 (23:42 +0200)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 21:42:13 +0000 (23:42 +0200)
This reverts commit 99392cdd78b788295e52b9f4942fa11992fd5ba9.
We'd rather rewrite ri_triggers.c as a whole rather than piecemeal.

Discussion: https://postgr.es/m/E1ncXX2-000mFt-Pe@gemulon.postgresql.org

src/backend/executor/execPartition.c
src/backend/executor/nodeLockRows.c
src/backend/utils/adt/ri_triggers.c
src/include/executor/execPartition.h
src/include/executor/executor.h
src/test/isolation/expected/fk-snapshot.out
src/test/isolation/specs/fk-snapshot.spec

index c22c9ac0966528152728f89dc5dbf137ad941afa..615bd80973539691f6248298fb4a102b92f21ccc 100644 (file)
@@ -176,9 +176,8 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
                                  EState *estate,
                                  Datum *values,
                                  bool *isnull);
-static int get_partition_for_tuple(PartitionKey key,
-                                   PartitionDesc partdesc,
-                                   Datum *values, bool *isnull);
+static int get_partition_for_tuple(PartitionDispatch pd, Datum *values,
+                                   bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
                                                  Datum *values,
                                                  bool *isnull,
@@ -319,9 +318,7 @@ ExecFindPartition(ModifyTableState *mtstate,
         * these values, error out.
         */
        if (partdesc->nparts == 0 ||
-           (partidx = get_partition_for_tuple(dispatch->key,
-                                              dispatch->partdesc,
-                                              values, isnull)) < 0)
+           (partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
        {
            char       *val_desc;
 
@@ -1344,12 +1341,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionKey key,
-                       PartitionDesc partdesc,
-                       Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
 {
    int         bound_offset;
    int         part_index = -1;
+   PartitionKey key = pd->key;
+   PartitionDesc partdesc = pd->partdesc;
    PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
    /* Route as appropriate based on partitioning strategy. */
@@ -1441,165 +1438,6 @@ get_partition_for_tuple(PartitionKey key,
    return part_index;
 }
 
-/*
- * ExecGetLeafPartitionForKey
- *     Finds the leaf partition of partitioned table 'root_rel' that would
- *     contain the specified key tuple.
- *
- * A subset of the table's columns (including all of the partition key columns)
- * must be specified:
- * - 'key_natts' indicats the number of columns contained in the key
- * - 'key_attnums' indicates their attribute numbers as defined in 'root_rel'
- * - 'key_vals' and 'key_nulls' specify the key tuple
- *
- * Returns the leaf partition, locked with the given lockmode, or NULL if
- * there isn't one.  Caller is responsibly for closing it.  All intermediate
- * partitions are also locked with the same lockmode.  Caller must have locked
- * the root already.
- *
- * In addition, the OID of the index of a unique constraint on the root table
- * must be given as 'root_idxoid'; *leaf_idxoid will be set to the OID of the
- * corresponding index on the returned leaf partition.  (This can be used by
- * caller to search for a tuple matching the key in the leaf partition.)
- *
- * This works because the unique key defined on the root relation is required
- * to contain the partition key columns of all of the ancestors that lead up to
- * a given leaf partition.
- */
-Relation
-ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
-                          const AttrNumber *key_attnums,
-                          Datum *key_vals, char *key_nulls,
-                          Oid root_idxoid, int lockmode,
-                          Oid *leaf_idxoid)
-{
-   Relation    found_leafpart = NULL;
-   Relation    rel = root_rel;
-   Oid         constr_idxoid = root_idxoid;
-   PartitionDirectory partdir;
-
-   Assert(root_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
-
-   *leaf_idxoid = InvalidOid;
-
-   partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
-
-   /*
-    * Descend through partitioned parents to find the leaf partition that
-    * would accept a row with the provided key values, starting with the root
-    * parent.
-    */
-   for (;;)
-   {
-       PartitionKey partkey = RelationGetPartitionKey(rel);
-       PartitionDesc partdesc;
-       Datum       partkey_vals[PARTITION_MAX_KEYS];
-       bool        partkey_isnull[PARTITION_MAX_KEYS];
-       AttrNumber *root_partattrs = partkey->partattrs;
-       int         found_att;
-       int         partidx;
-       Oid         partoid;
-
-       CHECK_FOR_INTERRUPTS();
-
-       /*
-        * Collect partition key values from the unique key.
-        *
-        * Because we only have the root table's copy of pk_attnums, must map
-        * any non-root table's partition key attribute numbers to the root
-        * table's.
-        */
-       if (rel != root_rel)
-       {
-           /*
-            * map->attnums will contain root table attribute numbers for each
-            * attribute of the current partitioned relation.
-            */
-           AttrMap    *map;
-
-           map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
-                                              RelationGetDescr(rel));
-           if (map)
-           {
-               root_partattrs = palloc(partkey->partnatts *
-                                       sizeof(AttrNumber));
-               for (int att = 0; att < partkey->partnatts; att++)
-               {
-                   AttrNumber  partattno = partkey->partattrs[att];
-
-                   root_partattrs[att] = map->attnums[partattno - 1];
-               }
-
-               free_attrmap(map);
-           }
-       }
-
-       /*
-        * Map the values/isnulls to match the partition description, as
-        * necessary.
-        *
-        * (Referenced key specification does not allow expressions, so there
-        * would not be expressions in the partition keys either.)
-        */
-       Assert(partkey->partexprs == NIL);
-       found_att = 0;
-       for (int keyatt = 0; keyatt < key_natts; keyatt++)
-       {
-           for (int att = 0; att < partkey->partnatts; att++)
-           {
-               if (root_partattrs[att] == key_attnums[keyatt])
-               {
-                   partkey_vals[found_att] = key_vals[keyatt];
-                   partkey_isnull[found_att] = (key_nulls[keyatt] == 'n');
-                   found_att++;
-                   break;
-               }
-           }
-       }
-       /* We had better have found values for all partition keys */
-       Assert(found_att == partkey->partnatts);
-
-       if (root_partattrs != partkey->partattrs)
-           pfree(root_partattrs);
-
-       /* Get the PartitionDesc using the partition directory machinery.  */
-       partdesc = PartitionDirectoryLookup(partdir, rel);
-       if (partdesc->nparts == 0)
-           break;
-
-       /* Find the partition for the key. */
-       partidx = get_partition_for_tuple(partkey, partdesc,
-                                         partkey_vals, partkey_isnull);
-       Assert(partidx < 0 || partidx < partdesc->nparts);
-
-       /* close the previous parent if any, but keep lock */
-       if (rel != root_rel)
-           table_close(rel, NoLock);
-
-       /* No partition found. */
-       if (partidx < 0)
-           break;
-
-       partoid = partdesc->oids[partidx];
-       rel = table_open(partoid, lockmode);
-       constr_idxoid = index_get_partition(rel, constr_idxoid);
-
-       /*
-        * We're done if the partition is a leaf, else find its partition in
-        * the next iteration.
-        */
-       if (partdesc->is_leaf[partidx])
-       {
-           *leaf_idxoid = constr_idxoid;
-           found_leafpart = rel;
-           break;
-       }
-   }
-
-   DestroyPartitionDirectory(partdir);
-   return found_leafpart;
-}
-
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
index bbccafb2cfd0a1a8d9ac16eb3a5309717fe71f18..1a9dab25dd6add51d171ef567d574310f613474c 100644 (file)
@@ -79,7 +79,10 @@ lnext:
        Datum       datum;
        bool        isNull;
        ItemPointerData tid;
+       TM_FailureData tmfd;
        LockTupleMode lockmode;
+       int         lockflags = 0;
+       TM_Result   test;
        TupleTableSlot *markSlot;
 
        /* clear any leftover test tuple for this rel */
@@ -176,11 +179,74 @@ lnext:
                break;
        }
 
-       /* skip tuple if it couldn't be locked */
-       if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
-                               estate->es_snapshot, estate->es_output_cid,
-                               lockmode, erm->waitPolicy, &epq_needed))
-           goto lnext;
+       lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+       if (!IsolationUsesXactSnapshot())
+           lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+       test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
+                               markSlot, estate->es_output_cid,
+                               lockmode, erm->waitPolicy,
+                               lockflags,
+                               &tmfd);
+
+       switch (test)
+       {
+           case TM_WouldBlock:
+               /* couldn't lock tuple in SKIP LOCKED mode */
+               goto lnext;
+
+           case TM_SelfModified:
+
+               /*
+                * The target tuple was already updated or deleted by the
+                * current command, or by a later command in the current
+                * transaction.  We *must* ignore the tuple in the former
+                * case, so as to avoid the "Halloween problem" of repeated
+                * update attempts.  In the latter case it might be sensible
+                * to fetch the updated tuple instead, but doing so would
+                * require changing heap_update and heap_delete to not
+                * complain about updating "invisible" tuples, which seems
+                * pretty scary (table_tuple_lock will not complain, but few
+                * callers expect TM_Invisible, and we're not one of them). So
+                * for now, treat the tuple as deleted and do not process.
+                */
+               goto lnext;
+
+           case TM_Ok:
+
+               /*
+                * Got the lock successfully, the locked tuple saved in
+                * markSlot for, if needed, EvalPlanQual testing below.
+                */
+               if (tmfd.traversed)
+                   epq_needed = true;
+               break;
+
+           case TM_Updated:
+               if (IsolationUsesXactSnapshot())
+                   ereport(ERROR,
+                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                            errmsg("could not serialize access due to concurrent update")));
+               elog(ERROR, "unexpected table_tuple_lock status: %u",
+                    test);
+               break;
+
+           case TM_Deleted:
+               if (IsolationUsesXactSnapshot())
+                   ereport(ERROR,
+                           (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                            errmsg("could not serialize access due to concurrent update")));
+               /* tuple was deleted so don't return it */
+               goto lnext;
+
+           case TM_Invisible:
+               elog(ERROR, "attempted to lock invisible tuple");
+               break;
+
+           default:
+               elog(ERROR, "unrecognized table_tuple_lock status: %u",
+                    test);
+       }
 
        /* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
        erm->curCtid = tid;
@@ -215,91 +281,6 @@ lnext:
    return slot;
 }
 
-/*
- * ExecLockTableTuple
- *         Locks tuple with the specified TID in lockmode following given wait
- *         policy
- *
- * Returns true if the tuple was successfully locked.  Locked tuple is loaded
- * into provided slot.
- */
-bool
-ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
-                  Snapshot snapshot, CommandId cid,
-                  LockTupleMode lockmode, LockWaitPolicy waitPolicy,
-                  bool *epq_needed)
-{
-   TM_FailureData tmfd;
-   int         lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-   TM_Result   test;
-
-   if (!IsolationUsesXactSnapshot())
-       lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-   test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
-                           waitPolicy, lockflags, &tmfd);
-
-   switch (test)
-   {
-       case TM_WouldBlock:
-           /* couldn't lock tuple in SKIP LOCKED mode */
-           return false;
-
-       case TM_SelfModified:
-
-           /*
-            * The target tuple was already updated or deleted by the current
-            * command, or by a later command in the current transaction.  We
-            * *must* ignore the tuple in the former case, so as to avoid the
-            * "Halloween problem" of repeated update attempts.  In the latter
-            * case it might be sensible to fetch the updated tuple instead,
-            * but doing so would require changing heap_update and heap_delete
-            * to not complain about updating "invisible" tuples, which seems
-            * pretty scary (table_tuple_lock will not complain, but few
-            * callers expect TM_Invisible, and we're not one of them). So for
-            * now, treat the tuple as deleted and do not process.
-            */
-           return false;
-
-       case TM_Ok:
-
-           /*
-            * Got the lock successfully, the locked tuple saved in slot for
-            * EvalPlanQual, if asked by the caller.
-            */
-           if (tmfd.traversed && epq_needed)
-               *epq_needed = true;
-           break;
-
-       case TM_Updated:
-           if (IsolationUsesXactSnapshot())
-               ereport(ERROR,
-                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                        errmsg("could not serialize access due to concurrent update")));
-           elog(ERROR, "unexpected table_tuple_lock status: %u",
-                test);
-           break;
-
-       case TM_Deleted:
-           if (IsolationUsesXactSnapshot())
-               ereport(ERROR,
-                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                        errmsg("could not serialize access due to concurrent update")));
-           /* tuple was deleted so don't return it */
-           return false;
-
-       case TM_Invisible:
-           elog(ERROR, "attempted to lock invisible tuple");
-           return false;
-
-       default:
-           elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
-           return false;
-   }
-
-   return true;
-}
-
 /* ----------------------------------------------------------------
  *     ExecInitLockRows
  *
index 088b4027008d045c8072069bd830d7f7dccb8f1f..01d4c22cfce1850473f91b867e5a155d202b64aa 100644 (file)
@@ -23,7 +23,6 @@
 
 #include "postgres.h"
 
-#include "access/genam.h"
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/table.h"
@@ -34,7 +33,6 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
-#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #define RI_KEYS_NONE_NULL              2
 
 /* RI query type codes */
-#define RI_PLAN_CASCADE_ONDELETE       1
-#define RI_PLAN_CASCADE_ONUPDATE       2
+/* these queries are executed against the PK (referenced) table: */
+#define RI_PLAN_CHECK_LOOKUPPK         1
+#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
+#define RI_PLAN_LAST_ON_PK             RI_PLAN_CHECK_LOOKUPPK_FROM_PK
+/* these queries are executed against the FK (referencing) table: */
+#define RI_PLAN_CASCADE_ONDELETE       3
+#define RI_PLAN_CASCADE_ONUPDATE       4
 /* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */
-#define RI_PLAN_RESTRICT               3
-#define RI_PLAN_SETNULL_ONDELETE       4
-#define RI_PLAN_SETNULL_ONUPDATE       5
-#define RI_PLAN_SETDEFAULT_ONDELETE        6
-#define RI_PLAN_SETDEFAULT_ONUPDATE        7
+#define RI_PLAN_RESTRICT               5
+#define RI_PLAN_SETNULL_ONDELETE       6
+#define RI_PLAN_SETNULL_ONUPDATE       7
+#define RI_PLAN_SETDEFAULT_ONDELETE        8
+#define RI_PLAN_SETDEFAULT_ONUPDATE        9
 
 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
@@ -226,278 +229,9 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                               Relation pk_rel, Relation fk_rel,
                               TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                              bool on_fk, bool partgone) pg_attribute_noreturn();
-static Oid get_fkey_unique_index(Oid conoid);
+                              int queryno, bool partgone) pg_attribute_noreturn();
 
 
-/*
- * Checks whether a tuple containing the unique key as extracted from the
- * tuple provided in 'slot' exists in 'pk_rel'.  The key is extracted using the
- * constraint's index given in 'riinfo', which is also scanned to check the
- * existence of the key.
- *
- * If 'pk_rel' is a partitioned table, the check is performed on its leaf
- * partition that would contain the key.
- *
- * The provided tuple is either the one being inserted into the referencing
- * relation ('fk_rel' is non-NULL), or the one being deleted from the
- * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
- */
-static bool
-ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
-                      TupleTableSlot *slot, const RI_ConstraintInfo *riinfo)
-{
-   Oid         constr_id = riinfo->constraint_id;
-   Oid         idxoid;
-   Relation    idxrel;
-   Relation    leaf_pk_rel = NULL;
-   int         num_pk;
-   int         i;
-   bool        found = false;
-   const Oid  *eq_oprs;
-   Datum       pk_vals[INDEX_MAX_KEYS];
-   char        pk_nulls[INDEX_MAX_KEYS];
-   ScanKeyData skey[INDEX_MAX_KEYS];
-   Snapshot    snap = InvalidSnapshot;
-   bool        pushed_latest_snapshot = false;
-   IndexScanDesc scan;
-   TupleTableSlot *outslot;
-   Oid         saved_userid;
-   int         saved_sec_context;
-   AclResult   aclresult;
-
-   /*
-    * Extract the unique key from the provided slot and choose the equality
-    * operators to use when scanning the index below.
-    */
-   if (fk_rel)
-   {
-       ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
-       /* Use PK = FK equality operator. */
-       eq_oprs = riinfo->pf_eq_oprs;
-
-       /*
-        * May need to cast each of the individual values of the foreign key
-        * to the corresponding PK column's type if the equality operator
-        * demands it.
-        */
-       for (i = 0; i < riinfo->nkeys; i++)
-       {
-           if (pk_nulls[i] != 'n')
-           {
-               Oid         eq_opr = eq_oprs[i];
-               Oid         typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-               RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
-
-               if (OidIsValid(entry->cast_func_finfo.fn_oid))
-                   pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
-                                              pk_vals[i],
-                                              Int32GetDatum(-1),   /* typmod */
-                                              BoolGetDatum(false));    /* implicit coercion */
-           }
-       }
-   }
-   else
-   {
-       ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
-       /* Use PK = PK equality operator. */
-       eq_oprs = riinfo->pp_eq_oprs;
-   }
-
-   /*
-    * Switch to referenced table's owner to perform the below operations as.
-    * This matches what ri_PerformCheck() does.
-    *
-    * Note that as with queries done by ri_PerformCheck(), the way we select
-    * the referenced row below effectively bypasses any RLS policies that may
-    * be present on the referenced table.
-    */
-   GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
-   SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
-                          saved_sec_context | SECURITY_LOCAL_USERID_CHANGE);
-
-   /*
-    * Also check that the new user has permissions to look into the schema of
-    * and SELECT from the referenced table.
-    */
-   aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
-                                     GetUserId(), ACL_USAGE);
-   if (aclresult != ACLCHECK_OK)
-       aclcheck_error(aclresult, OBJECT_SCHEMA,
-                      get_namespace_name(RelationGetNamespace(pk_rel)));
-   aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
-                                 ACL_SELECT);
-   if (aclresult != ACLCHECK_OK)
-       aclcheck_error(aclresult, OBJECT_TABLE,
-                      RelationGetRelationName(pk_rel));
-
-   /* Make the changes of the current command visible in all cases. */
-   CommandCounterIncrement();
-
-   /*
-    * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like
-    * to see all rows that could be interesting, even those that would not be
-    * visible to the transaction snapshot.  To do so, force-push the latest
-    * snapshot.
-    */
-   if (fk_rel == NULL)
-   {
-       snap = GetLatestSnapshot();
-       PushActiveSnapshot(snap);
-       pushed_latest_snapshot = true;
-   }
-   else
-   {
-       snap = GetTransactionSnapshot();
-       PushActiveSnapshot(snap);
-   }
-
-   /*
-    * Open the constraint index to be scanned.
-    *
-    * If the target table is partitioned, we must look up the leaf partition
-    * and its corresponding unique index to search the keys in.
-    */
-   idxoid = get_fkey_unique_index(constr_id);
-   if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-   {
-       Oid         leaf_idxoid;
-       Snapshot    mysnap = InvalidSnapshot;
-
-       /*
-        * XXX the partition descriptor machinery has a hack that assumes that
-        * the queries originating in this module push the latest snapshot in
-        * the transaction-snapshot mode.  If we haven't pushed one already,
-        * do so now.
-        */
-       if (!pushed_latest_snapshot)
-       {
-           mysnap = GetLatestSnapshot();
-           PushActiveSnapshot(mysnap);
-       }
-
-       leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
-                                                riinfo->pk_attnums,
-                                                pk_vals, pk_nulls,
-                                                idxoid, RowShareLock,
-                                                &leaf_idxoid);
-
-       /*
-        * XXX done fiddling with the partition descriptor machinery so unset
-        * the active snapshot if we must.
-        */
-       if (mysnap != InvalidSnapshot)
-           PopActiveSnapshot();
-
-       /*
-        * If no suitable leaf partition exists, neither can the key we're
-        * looking for.
-        */
-       if (leaf_pk_rel == NULL)
-       {
-           SetUserIdAndSecContext(saved_userid, saved_sec_context);
-           PopActiveSnapshot();
-           return false;
-       }
-
-       pk_rel = leaf_pk_rel;
-       idxoid = leaf_idxoid;
-   }
-   idxrel = index_open(idxoid, RowShareLock);
-
-   /* Set up ScanKeys for the index scan. */
-   num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
-   for (i = 0; i < num_pk; i++)
-   {
-       int         pkattno = i + 1;
-       Oid         operator = eq_oprs[i];
-       Oid         opfamily = idxrel->rd_opfamily[i];
-       StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
-       RegProcedure regop = get_opcode(operator);
-
-       /* Initialize the scankey. */
-       ScanKeyInit(&skey[i],
-                   pkattno,
-                   strat,
-                   regop,
-                   pk_vals[i]);
-
-       skey[i].sk_collation = idxrel->rd_indcollation[i];
-
-       /*
-        * Check for null value.  Should not occur, because callers currently
-        * take care of the cases in which they do occur.
-        */
-       if (pk_nulls[i] == 'n')
-           skey[i].sk_flags |= SK_ISNULL;
-   }
-
-   scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0);
-   index_rescan(scan, skey, num_pk, NULL, 0);
-
-   /* Look for the tuple, and if found, try to lock it in key share mode. */
-   outslot = table_slot_create(pk_rel, NULL);
-   if (index_getnext_slot(scan, ForwardScanDirection, outslot))
-   {
-       /*
-        * If we fail to lock the tuple for whatever reason, assume it doesn't
-        * exist.
-        */
-       found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
-                                  snap,
-                                  GetCurrentCommandId(false),
-                                  LockTupleKeyShare,
-                                  LockWaitBlock, NULL);
-   }
-
-   index_endscan(scan);
-   ExecDropSingleTupleTableSlot(outslot);
-
-   /* Don't release lock until commit. */
-   index_close(idxrel, NoLock);
-
-   /* Close leaf partition relation if any. */
-   if (leaf_pk_rel)
-       table_close(leaf_pk_rel, NoLock);
-
-   /* Restore UID and security context */
-   SetUserIdAndSecContext(saved_userid, saved_sec_context);
-
-   PopActiveSnapshot();
-
-   return found;
-}
-
-/*
- * get_fkey_unique_index
- *         Returns the unique index used by a supposedly foreign key constraint
- *
- * XXX This is very similar to get_constraint_index; probably they should be
- * unified.
- */
-static Oid
-get_fkey_unique_index(Oid conoid)
-{
-   Oid         result = InvalidOid;
-   HeapTuple   tp;
-
-   tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
-   if (HeapTupleIsValid(tp))
-   {
-       Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
-
-       if (contup->contype == CONSTRAINT_FOREIGN)
-           result = contup->conindid;
-       ReleaseSysCache(tp);
-   }
-
-   if (!OidIsValid(result))
-       elog(ERROR, "unique index not found for foreign key constraint %u",
-            conoid);
-
-   return result;
-}
-
 /*
  * RI_FKey_check -
  *
@@ -510,6 +244,8 @@ RI_FKey_check(TriggerData *trigdata)
    Relation    fk_rel;
    Relation    pk_rel;
    TupleTableSlot *newslot;
+   RI_QueryKey qkey;
+   SPIPlanPtr  qplan;
 
    riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
                                    trigdata->tg_relation, false);
@@ -589,9 +325,9 @@ RI_FKey_check(TriggerData *trigdata)
 
                    /*
                     * MATCH PARTIAL - all non-null columns must match. (not
-                    * implemented, can be done by modifying
-                    * ri_ReferencedKeyExists() to only include non-null
-                    * columns.
+                    * implemented, can be done by modifying the query below
+                    * to only include non-null columns, or by writing a
+                    * special version here)
                     */
                    break;
 #endif
@@ -606,12 +342,74 @@ RI_FKey_check(TriggerData *trigdata)
            break;
    }
 
-   if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
-       ri_ReportViolation(riinfo,
-                          pk_rel, fk_rel,
-                          newslot,
-                          NULL,
-                          true, false);
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "SPI_connect failed");
+
+   /* Fetch or prepare a saved plan for the real check */
+   ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
+
+   if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+   {
+       StringInfoData querybuf;
+       char        pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char        attname[MAX_QUOTED_NAME_LEN];
+       char        paramname[16];
+       const char *querysep;
+       Oid         queryoids[RI_MAX_NUMKEYS];
+       const char *pk_only;
+
+       /* ----------
+        * The query string built is
+        *  SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+        *         FOR KEY SHARE OF x
+        * The type id's for the $ parameters are those of the
+        * corresponding FK attributes.
+        * ----------
+        */
+       initStringInfo(&querybuf);
+       pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+           "" : "ONLY ";
+       quoteRelationName(pkrelname, pk_rel);
+       appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                        pk_only, pkrelname);
+       querysep = "WHERE";
+       for (int i = 0; i < riinfo->nkeys; i++)
+       {
+           Oid         pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+           Oid         fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+
+           quoteOneName(attname,
+                        RIAttName(pk_rel, riinfo->pk_attnums[i]));
+           sprintf(paramname, "$%d", i + 1);
+           ri_GenerateQual(&querybuf, querysep,
+                           attname, pk_type,
+                           riinfo->pf_eq_oprs[i],
+                           paramname, fk_type);
+           querysep = "AND";
+           queryoids[i] = fk_type;
+       }
+       appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+       /* Prepare and save the plan */
+       qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+                            &qkey, fk_rel, pk_rel);
+   }
+
+   /*
+    * Now check that foreign key exists in PK table
+    *
+    * XXX detectNewRows must be true when a partitioned table is on the
+    * referenced side.  The reason is that our snapshot must be fresh in
+    * order for the hack in find_inheritance_children() to work.
+    */
+   ri_PerformCheck(riinfo, &qkey, qplan,
+                   fk_rel, pk_rel,
+                   NULL, newslot,
+                   pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
+                   SPI_OK_SELECT);
+
+   if (SPI_finish() != SPI_OK_FINISH)
+       elog(ERROR, "SPI_finish failed");
 
    table_close(pk_rel, RowShareLock);
 
@@ -666,10 +464,81 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
                  TupleTableSlot *oldslot,
                  const RI_ConstraintInfo *riinfo)
 {
+   SPIPlanPtr  qplan;
+   RI_QueryKey qkey;
+   bool        result;
+
    /* Only called for non-null rows */
    Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-   return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "SPI_connect failed");
+
+   /*
+    * Fetch or prepare a saved plan for checking PK table with values coming
+    * from a PK row
+    */
+   ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
+
+   if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+   {
+       StringInfoData querybuf;
+       char        pkrelname[MAX_QUOTED_REL_NAME_LEN];
+       char        attname[MAX_QUOTED_NAME_LEN];
+       char        paramname[16];
+       const char *querysep;
+       const char *pk_only;
+       Oid         queryoids[RI_MAX_NUMKEYS];
+
+       /* ----------
+        * The query string built is
+        *  SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+        *         FOR KEY SHARE OF x
+        * The type id's for the $ parameters are those of the
+        * PK attributes themselves.
+        * ----------
+        */
+       initStringInfo(&querybuf);
+       pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+           "" : "ONLY ";
+       quoteRelationName(pkrelname, pk_rel);
+       appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+                        pk_only, pkrelname);
+       querysep = "WHERE";
+       for (int i = 0; i < riinfo->nkeys; i++)
+       {
+           Oid         pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+
+           quoteOneName(attname,
+                        RIAttName(pk_rel, riinfo->pk_attnums[i]));
+           sprintf(paramname, "$%d", i + 1);
+           ri_GenerateQual(&querybuf, querysep,
+                           attname, pk_type,
+                           riinfo->pp_eq_oprs[i],
+                           paramname, pk_type);
+           querysep = "AND";
+           queryoids[i] = pk_type;
+       }
+       appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+       /* Prepare and save the plan */
+       qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+                            &qkey, fk_rel, pk_rel);
+   }
+
+   /*
+    * We have a plan now. Run it.
+    */
+   result = ri_PerformCheck(riinfo, &qkey, qplan,
+                            fk_rel, pk_rel,
+                            oldslot, NULL,
+                            true,  /* treat like update */
+                            SPI_OK_SELECT);
+
+   if (SPI_finish() != SPI_OK_FINISH)
+       elog(ERROR, "SPI_finish failed");
+
+   return result;
 }
 
 
@@ -1739,10 +1608,15 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                     errtableconstraint(fk_rel,
                                        NameStr(fake_riinfo.conname))));
 
+       /*
+        * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
+        * query, which isn't true, but will cause it to use
+        * fake_riinfo.fk_attnums as we need.
+        */
        ri_ReportViolation(&fake_riinfo,
                           pk_rel, fk_rel,
                           slot, tupdesc,
-                          true, false);
+                          RI_PLAN_CHECK_LOOKUPPK, false);
 
        ExecDropSingleTupleTableSlot(slot);
    }
@@ -1959,7 +1833,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
            fake_riinfo.pk_attnums[i] = i + 1;
 
        ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
-                          slot, tupdesc, true, true);
+                          slot, tupdesc, 0, true);
    }
 
    if (SPI_finish() != SPI_OK_FINISH)
@@ -2096,8 +1970,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 {
    /*
     * Inherited constraints with a common ancestor can share ri_query_cache
-    * entries, because each query processes the other table involved in the
-    * FK constraint (i.e., not the table on which the trigger has been
+    * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
+    * Except in that case, the query processes the other table involved in
+    * the FK constraint (i.e., not the table on which the trigger has been
     * fired), and so it will be the same for all members of the inheritance
     * tree.  So we may use the root constraint's OID in the hash key, rather
     * than the constraint's own OID.  This avoids creating duplicate SPI
@@ -2108,13 +1983,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
     * constraint, because partitions can have different column orders,
     * resulting in different pk_attnums[] or fk_attnums[] array contents.)
     *
-    * (Note also that for a standalone or non-inherited constraint,
-    * constraint_root_id is same as constraint_id.)
-    *
     * We assume struct RI_QueryKey contains no padding bytes, else we'd need
     * to use memset to clear them.
     */
-   key->constr_id = riinfo->constraint_root_id;
+   if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+       key->constr_id = riinfo->constraint_root_id;
+   else
+       key->constr_id = riinfo->constraint_id;
    key->constr_queryno = constr_queryno;
 }
 
@@ -2385,12 +2260,19 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
             RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
    SPIPlanPtr  qplan;
-
-   /* There are currently no queries that run on PK table. */
-   Relation    query_rel = fk_rel;
+   Relation    query_rel;
    Oid         save_userid;
    int         save_sec_context;
 
+   /*
+    * Use the query type code to determine whether the query is run against
+    * the PK or FK table; we'll do the check as that table's owner
+    */
+   if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+       query_rel = pk_rel;
+   else
+       query_rel = fk_rel;
+
    /* Switch to proper UID to perform check as */
    GetUserIdAndSecContext(&save_userid, &save_sec_context);
    SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2423,9 +2305,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                TupleTableSlot *oldslot, TupleTableSlot *newslot,
                bool detectNewRows, int expect_OK)
 {
-   /* There are currently no queries that run on PK table. */
-   Relation    query_rel = fk_rel,
-               source_rel = pk_rel;
+   Relation    query_rel,
+               source_rel;
+   bool        source_is_pk;
    Snapshot    test_snapshot;
    Snapshot    crosscheck_snapshot;
    int         limit;
@@ -2435,17 +2317,46 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
    Datum       vals[RI_MAX_NUMKEYS * 2];
    char        nulls[RI_MAX_NUMKEYS * 2];
 
+   /*
+    * Use the query type code to determine whether the query is run against
+    * the PK or FK table; we'll do the check as that table's owner
+    */
+   if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+       query_rel = pk_rel;
+   else
+       query_rel = fk_rel;
+
+   /*
+    * The values for the query are taken from the table on which the trigger
+    * is called - it is normally the other one with respect to query_rel. An
+    * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
+    * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
+    * need some less klugy way to determine this.
+    */
+   if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+   {
+       source_rel = fk_rel;
+       source_is_pk = false;
+   }
+   else
+   {
+       source_rel = pk_rel;
+       source_is_pk = true;
+   }
+
    /* Extract the parameters to be passed into the query */
    if (newslot)
    {
-       ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls);
+       ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
+                        vals, nulls);
        if (oldslot)
-           ri_ExtractValues(source_rel, oldslot, riinfo, true,
+           ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
                             vals + riinfo->nkeys, nulls + riinfo->nkeys);
    }
    else
    {
-       ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls);
+       ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
+                        vals, nulls);
    }
 
    /*
@@ -2509,12 +2420,14 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                 errhint("This is most likely due to a rule having rewritten the query.")));
 
    /* XXX wouldn't it be clearer to do this part at the caller? */
-   if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
+   if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+       expect_OK == SPI_OK_SELECT &&
+       (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
        ri_ReportViolation(riinfo,
                           pk_rel, fk_rel,
                           newslot ? newslot : oldslot,
                           NULL,
-                          false, false);
+                          qkey->constr_queryno, false);
 
    return SPI_processed != 0;
 }
@@ -2545,9 +2458,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 /*
  * Produce an error report
  *
- * If the failed constraint was on insert/update to the FK table (on_fk is
- * true), we want the key names and values extracted from there, and the
- * error message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table,
+ * we want the key names and values extracted from there, and the error
+ * message to look like 'key blah is not present in PK'.
  * Otherwise, the attr names and values come from the PK table and the
  * message looks like 'key blah is still referenced from FK'.
  */
@@ -2555,20 +2468,22 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                   Relation pk_rel, Relation fk_rel,
                   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                  bool on_fk, bool partgone)
+                  int queryno, bool partgone)
 {
    StringInfoData key_names;
    StringInfoData key_values;
+   bool        onfk;
    const int16 *attnums;
    Oid         rel_oid;
    AclResult   aclresult;
    bool        has_perm = true;
 
    /*
-    * If tupdesc wasn't passed by caller, assume the violator tuple came from
-    * there.
+    * Determine which relation to complain about.  If tupdesc wasn't passed
+    * by caller, assume the violator tuple came from there.
     */
-   if (on_fk)
+   onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
+   if (onfk)
    {
        attnums = riinfo->fk_attnums;
        rel_oid = fk_rel->rd_id;
@@ -2670,7 +2585,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                           key_names.data, key_values.data,
                           RelationGetRelationName(fk_rel)),
                 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
-   else if (on_fk)
+   else if (onfk)
        ereport(ERROR,
                (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
                 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2977,10 +2892,7 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.  The entry contains the FmgrInfo of the equality operator function
- * and that of the cast function, if one is needed to convert the right
- * operand (whose type OID has been passed) before passing it to the equality
- * function.
+ * if not.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3036,16 +2948,8 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
         * moment since that will never be generated for implicit coercions.
         */
        op_input_types(eq_opr, &lefttype, &righttype);
-
-       /*
-        * Don't need to cast if the values that will be passed to the
-        * operator will be of expected operand type(s).  The operator can be
-        * cross-type (such as when called by ri_ReferencedKeyExists()), in
-        * which case, we only need the cast if the right operand value
-        * doesn't match the type expected by the operator.
-        */
-       if ((lefttype == righttype && typeid == lefttype) ||
-           (lefttype != righttype && typeid == righttype))
+       Assert(lefttype == righttype);
+       if (typeid == lefttype)
            castfunc = InvalidOid;  /* simplest case */
        else
        {
index cbe1d996e6b5200832a68b11400767620c4cf873..708435e95285194fbb7b9f407e2de11485d9ca56 100644 (file)
@@ -31,12 +31,6 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
                                        EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
                                    PartitionTupleRouting *proute);
-extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
-                                          int key_natts,
-                                          const AttrNumber *key_attnums,
-                                          Datum *key_vals, char *key_nulls,
-                                          Oid root_idxoid, int lockmode,
-                                          Oid *leaf_idxoid);
 
 
 /*
index 216d28679a6e35b1220240c071f5a3ee8e924b9e..873772f188374b0de4eeddcb3850e4346abc2574 100644 (file)
@@ -651,14 +651,6 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
                                     const char *relname);
 
-/*
- * prototypes from functions in nodeLockRows.c
- */
-extern bool ExecLockTableTuple(Relation relation, ItemPointer tid,
-                              TupleTableSlot *slot, Snapshot snapshot,
-                              CommandId cid, LockTupleMode lockmode,
-                              LockWaitPolicy waitPolicy, bool *epq_needed);
-
 /*
  * prototypes from functions in nodeModifyTable.c
  */
index 22752cc7429014e9a0406dc7dd08004642af0cea..5faf80d6ce0e4cd292ba3788dbbd5ef4fd689447 100644 (file)
@@ -47,12 +47,12 @@ a
 
 step s2ifn2: INSERT INTO fk_noparted VALUES (2);
 step s2c: COMMIT;
-ERROR:  insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
 step s2sfn: SELECT * FROM fk_noparted;
 a
 -
 1
-(1 row)
+2
+(2 rows)
 
 
 starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
index 64d27f29c3c859f56042770e0cf1d44e5d12ceb2..378507fbc3822be4821eff7a2062526750c59f81 100644 (file)
@@ -46,7 +46,10 @@ step s2sfn   { SELECT * FROM fk_noparted; }
 # inserting into referencing tables in transaction-snapshot mode
 # PK table is non-partitioned
 permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned
+# PK table is partitioned: buggy, because s2's serialization transaction can
+# see the uncommitted row thanks to the latest snapshot taken for
+# partition lookup to work correctly also ends up getting used by the PK index
+# scan
 permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
 
 # inserting into referencing tables in up-to-date snapshot mode