Rewrite some RI code to avoid using SPI
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 19:04:36 +0000 (21:04 +0200)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 7 Apr 2022 19:10:03 +0000 (21:10 +0200)
Modify the subroutines called by RI trigger functions that want to check
if a given referenced value exists in the referenced relation to simply
scan the foreign key constraint's unique index, instead of using SPI to
execute
  SELECT 1 FROM referenced_relation WHERE ref_key = $1
This saves a lot of work, especially when inserting into or updating a
referencing relation.

This rewrite allows to fix a PK row visibility bug caused by a partition
descriptor hack which requires ActiveSnapshot to be set to come up with
the correct set of partitions for the RI query running under REPEATABLE
READ isolation.  We now set that snapshot indepedently of the snapshot
to be used by the PK index scan, so the two no longer interfere.  The
buggy output in src/test/isolation/expected/fk-snapshot.out of the
relevant test case added by commit 00cb86e75d6d has been corrected.
(The bug still exists in branch 14, however, but this fix is too
invasive to backpatch.)

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Corey Huinker <corey.huinker@gmail.com>
Reviewed-by: Li Japin <japinli@hotmail.com>
Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us>
Reviewed-by: Zhihong Yu <zyu@yugabyte.com>
Discussion: https://postgr.es/m/CA+HiwqGkfJfYdeq5vHPh6eqPKjSbfpDDY+j-kXYFePQedtSLeg@mail.gmail.com

src/backend/executor/execPartition.c
src/backend/executor/nodeLockRows.c
src/backend/utils/adt/ri_triggers.c
src/include/executor/execPartition.h
src/include/executor/executor.h
src/test/isolation/expected/fk-snapshot.out
src/test/isolation/specs/fk-snapshot.spec

index 615bd80973539691f6248298fb4a102b92f21ccc..c22c9ac0966528152728f89dc5dbf137ad941afa 100644 (file)
@@ -176,8 +176,9 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
                                                                  EState *estate,
                                                                  Datum *values,
                                                                  bool *isnull);
-static int     get_partition_for_tuple(PartitionDispatch pd, Datum *values,
-                                                                       bool *isnull);
+static int     get_partition_for_tuple(PartitionKey key,
+                                                                       PartitionDesc partdesc,
+                                                                       Datum *values, bool *isnull);
 static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
                                                                                                  Datum *values,
                                                                                                  bool *isnull,
@@ -318,7 +319,9 @@ ExecFindPartition(ModifyTableState *mtstate,
                 * these values, error out.
                 */
                if (partdesc->nparts == 0 ||
-                       (partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
+                       (partidx = get_partition_for_tuple(dispatch->key,
+                                                                                          dispatch->partdesc,
+                                                                                          values, isnull)) < 0)
                {
                        char       *val_desc;
 
@@ -1341,12 +1344,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
  * found or -1 if none found.
  */
 static int
-get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionKey key,
+                                               PartitionDesc partdesc,
+                                               Datum *values, bool *isnull)
 {
        int                     bound_offset;
        int                     part_index = -1;
-       PartitionKey key = pd->key;
-       PartitionDesc partdesc = pd->partdesc;
        PartitionBoundInfo boundinfo = partdesc->boundinfo;
 
        /* Route as appropriate based on partitioning strategy. */
@@ -1438,6 +1441,165 @@ get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
        return part_index;
 }
 
+/*
+ * ExecGetLeafPartitionForKey
+ *             Finds the leaf partition of partitioned table 'root_rel' that would
+ *             contain the specified key tuple.
+ *
+ * A subset of the table's columns (including all of the partition key columns)
+ * must be specified:
+ * - 'key_natts' indicats the number of columns contained in the key
+ * - 'key_attnums' indicates their attribute numbers as defined in 'root_rel'
+ * - 'key_vals' and 'key_nulls' specify the key tuple
+ *
+ * Returns the leaf partition, locked with the given lockmode, or NULL if
+ * there isn't one.  Caller is responsibly for closing it.  All intermediate
+ * partitions are also locked with the same lockmode.  Caller must have locked
+ * the root already.
+ *
+ * In addition, the OID of the index of a unique constraint on the root table
+ * must be given as 'root_idxoid'; *leaf_idxoid will be set to the OID of the
+ * corresponding index on the returned leaf partition.  (This can be used by
+ * caller to search for a tuple matching the key in the leaf partition.)
+ *
+ * This works because the unique key defined on the root relation is required
+ * to contain the partition key columns of all of the ancestors that lead up to
+ * a given leaf partition.
+ */
+Relation
+ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
+                                                  const AttrNumber *key_attnums,
+                                                  Datum *key_vals, char *key_nulls,
+                                                  Oid root_idxoid, int lockmode,
+                                                  Oid *leaf_idxoid)
+{
+       Relation        found_leafpart = NULL;
+       Relation        rel = root_rel;
+       Oid                     constr_idxoid = root_idxoid;
+       PartitionDirectory partdir;
+
+       Assert(root_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+
+       *leaf_idxoid = InvalidOid;
+
+       partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
+
+       /*
+        * Descend through partitioned parents to find the leaf partition that
+        * would accept a row with the provided key values, starting with the root
+        * parent.
+        */
+       for (;;)
+       {
+               PartitionKey partkey = RelationGetPartitionKey(rel);
+               PartitionDesc partdesc;
+               Datum           partkey_vals[PARTITION_MAX_KEYS];
+               bool            partkey_isnull[PARTITION_MAX_KEYS];
+               AttrNumber *root_partattrs = partkey->partattrs;
+               int                     found_att;
+               int                     partidx;
+               Oid                     partoid;
+
+               CHECK_FOR_INTERRUPTS();
+
+               /*
+                * Collect partition key values from the unique key.
+                *
+                * Because we only have the root table's copy of pk_attnums, must map
+                * any non-root table's partition key attribute numbers to the root
+                * table's.
+                */
+               if (rel != root_rel)
+               {
+                       /*
+                        * map->attnums will contain root table attribute numbers for each
+                        * attribute of the current partitioned relation.
+                        */
+                       AttrMap    *map;
+
+                       map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
+                                                                                          RelationGetDescr(rel));
+                       if (map)
+                       {
+                               root_partattrs = palloc(partkey->partnatts *
+                                                                               sizeof(AttrNumber));
+                               for (int att = 0; att < partkey->partnatts; att++)
+                               {
+                                       AttrNumber      partattno = partkey->partattrs[att];
+
+                                       root_partattrs[att] = map->attnums[partattno - 1];
+                               }
+
+                               free_attrmap(map);
+                       }
+               }
+
+               /*
+                * Map the values/isnulls to match the partition description, as
+                * necessary.
+                *
+                * (Referenced key specification does not allow expressions, so there
+                * would not be expressions in the partition keys either.)
+                */
+               Assert(partkey->partexprs == NIL);
+               found_att = 0;
+               for (int keyatt = 0; keyatt < key_natts; keyatt++)
+               {
+                       for (int att = 0; att < partkey->partnatts; att++)
+                       {
+                               if (root_partattrs[att] == key_attnums[keyatt])
+                               {
+                                       partkey_vals[found_att] = key_vals[keyatt];
+                                       partkey_isnull[found_att] = (key_nulls[keyatt] == 'n');
+                                       found_att++;
+                                       break;
+                               }
+                       }
+               }
+               /* We had better have found values for all partition keys */
+               Assert(found_att == partkey->partnatts);
+
+               if (root_partattrs != partkey->partattrs)
+                       pfree(root_partattrs);
+
+               /* Get the PartitionDesc using the partition directory machinery.  */
+               partdesc = PartitionDirectoryLookup(partdir, rel);
+               if (partdesc->nparts == 0)
+                       break;
+
+               /* Find the partition for the key. */
+               partidx = get_partition_for_tuple(partkey, partdesc,
+                                                                                 partkey_vals, partkey_isnull);
+               Assert(partidx < 0 || partidx < partdesc->nparts);
+
+               /* close the previous parent if any, but keep lock */
+               if (rel != root_rel)
+                       table_close(rel, NoLock);
+
+               /* No partition found. */
+               if (partidx < 0)
+                       break;
+
+               partoid = partdesc->oids[partidx];
+               rel = table_open(partoid, lockmode);
+               constr_idxoid = index_get_partition(rel, constr_idxoid);
+
+               /*
+                * We're done if the partition is a leaf, else find its partition in
+                * the next iteration.
+                */
+               if (partdesc->is_leaf[partidx])
+               {
+                       *leaf_idxoid = constr_idxoid;
+                       found_leafpart = rel;
+                       break;
+               }
+       }
+
+       DestroyPartitionDirectory(partdir);
+       return found_leafpart;
+}
+
 /*
  * ExecBuildSlotPartitionKeyDescription
  *
index 1a9dab25dd6add51d171ef567d574310f613474c..bbccafb2cfd0a1a8d9ac16eb3a5309717fe71f18 100644 (file)
@@ -79,10 +79,7 @@ lnext:
                Datum           datum;
                bool            isNull;
                ItemPointerData tid;
-               TM_FailureData tmfd;
                LockTupleMode lockmode;
-               int                     lockflags = 0;
-               TM_Result       test;
                TupleTableSlot *markSlot;
 
                /* clear any leftover test tuple for this rel */
@@ -179,74 +176,11 @@ lnext:
                                break;
                }
 
-               lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
-               if (!IsolationUsesXactSnapshot())
-                       lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
-               test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
-                                                               markSlot, estate->es_output_cid,
-                                                               lockmode, erm->waitPolicy,
-                                                               lockflags,
-                                                               &tmfd);
-
-               switch (test)
-               {
-                       case TM_WouldBlock:
-                               /* couldn't lock tuple in SKIP LOCKED mode */
-                               goto lnext;
-
-                       case TM_SelfModified:
-
-                               /*
-                                * The target tuple was already updated or deleted by the
-                                * current command, or by a later command in the current
-                                * transaction.  We *must* ignore the tuple in the former
-                                * case, so as to avoid the "Halloween problem" of repeated
-                                * update attempts.  In the latter case it might be sensible
-                                * to fetch the updated tuple instead, but doing so would
-                                * require changing heap_update and heap_delete to not
-                                * complain about updating "invisible" tuples, which seems
-                                * pretty scary (table_tuple_lock will not complain, but few
-                                * callers expect TM_Invisible, and we're not one of them). So
-                                * for now, treat the tuple as deleted and do not process.
-                                */
-                               goto lnext;
-
-                       case TM_Ok:
-
-                               /*
-                                * Got the lock successfully, the locked tuple saved in
-                                * markSlot for, if needed, EvalPlanQual testing below.
-                                */
-                               if (tmfd.traversed)
-                                       epq_needed = true;
-                               break;
-
-                       case TM_Updated:
-                               if (IsolationUsesXactSnapshot())
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                        errmsg("could not serialize access due to concurrent update")));
-                               elog(ERROR, "unexpected table_tuple_lock status: %u",
-                                        test);
-                               break;
-
-                       case TM_Deleted:
-                               if (IsolationUsesXactSnapshot())
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                        errmsg("could not serialize access due to concurrent update")));
-                               /* tuple was deleted so don't return it */
-                               goto lnext;
-
-                       case TM_Invisible:
-                               elog(ERROR, "attempted to lock invisible tuple");
-                               break;
-
-                       default:
-                               elog(ERROR, "unrecognized table_tuple_lock status: %u",
-                                        test);
-               }
+               /* skip tuple if it couldn't be locked */
+               if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
+                                                               estate->es_snapshot, estate->es_output_cid,
+                                                               lockmode, erm->waitPolicy, &epq_needed))
+                       goto lnext;
 
                /* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
                erm->curCtid = tid;
@@ -281,6 +215,91 @@ lnext:
        return slot;
 }
 
+/*
+ * ExecLockTableTuple
+ *             Locks tuple with the specified TID in lockmode following given wait
+ *             policy
+ *
+ * Returns true if the tuple was successfully locked.  Locked tuple is loaded
+ * into provided slot.
+ */
+bool
+ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
+                                  Snapshot snapshot, CommandId cid,
+                                  LockTupleMode lockmode, LockWaitPolicy waitPolicy,
+                                  bool *epq_needed)
+{
+       TM_FailureData tmfd;
+       int                     lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+       TM_Result       test;
+
+       if (!IsolationUsesXactSnapshot())
+               lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+       test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
+                                                       waitPolicy, lockflags, &tmfd);
+
+       switch (test)
+       {
+               case TM_WouldBlock:
+                       /* couldn't lock tuple in SKIP LOCKED mode */
+                       return false;
+
+               case TM_SelfModified:
+
+                       /*
+                        * The target tuple was already updated or deleted by the current
+                        * command, or by a later command in the current transaction.  We
+                        * *must* ignore the tuple in the former case, so as to avoid the
+                        * "Halloween problem" of repeated update attempts.  In the latter
+                        * case it might be sensible to fetch the updated tuple instead,
+                        * but doing so would require changing heap_update and heap_delete
+                        * to not complain about updating "invisible" tuples, which seems
+                        * pretty scary (table_tuple_lock will not complain, but few
+                        * callers expect TM_Invisible, and we're not one of them). So for
+                        * now, treat the tuple as deleted and do not process.
+                        */
+                       return false;
+
+               case TM_Ok:
+
+                       /*
+                        * Got the lock successfully, the locked tuple saved in slot for
+                        * EvalPlanQual, if asked by the caller.
+                        */
+                       if (tmfd.traversed && epq_needed)
+                               *epq_needed = true;
+                       break;
+
+               case TM_Updated:
+                       if (IsolationUsesXactSnapshot())
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                errmsg("could not serialize access due to concurrent update")));
+                       elog(ERROR, "unexpected table_tuple_lock status: %u",
+                                test);
+                       break;
+
+               case TM_Deleted:
+                       if (IsolationUsesXactSnapshot())
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                errmsg("could not serialize access due to concurrent update")));
+                       /* tuple was deleted so don't return it */
+                       return false;
+
+               case TM_Invisible:
+                       elog(ERROR, "attempted to lock invisible tuple");
+                       return false;
+
+               default:
+                       elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
+                       return false;
+       }
+
+       return true;
+}
+
 /* ----------------------------------------------------------------
  *             ExecInitLockRows
  *
index 01d4c22cfce1850473f91b867e5a155d202b64aa..088b4027008d045c8072069bd830d7f7dccb8f1f 100644 (file)
@@ -23,6 +23,7 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/table.h"
@@ -33,6 +34,7 @@
 #include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "commands/trigger.h"
+#include "executor/execPartition.h"
 #include "executor/executor.h"
 #include "executor/spi.h"
 #include "lib/ilist.h"
 #define RI_KEYS_NONE_NULL                              2
 
 /* RI query type codes */
-/* these queries are executed against the PK (referenced) table: */
-#define RI_PLAN_CHECK_LOOKUPPK                 1
-#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
-#define RI_PLAN_LAST_ON_PK                             RI_PLAN_CHECK_LOOKUPPK_FROM_PK
-/* these queries are executed against the FK (referencing) table: */
-#define RI_PLAN_CASCADE_ONDELETE               3
-#define RI_PLAN_CASCADE_ONUPDATE               4
+#define RI_PLAN_CASCADE_ONDELETE               1
+#define RI_PLAN_CASCADE_ONUPDATE               2
 /* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */
-#define RI_PLAN_RESTRICT                               5
-#define RI_PLAN_SETNULL_ONDELETE               6
-#define RI_PLAN_SETNULL_ONUPDATE               7
-#define RI_PLAN_SETDEFAULT_ONDELETE            8
-#define RI_PLAN_SETDEFAULT_ONUPDATE            9
+#define RI_PLAN_RESTRICT                               3
+#define RI_PLAN_SETNULL_ONDELETE               4
+#define RI_PLAN_SETNULL_ONUPDATE               5
+#define RI_PLAN_SETDEFAULT_ONDELETE            6
+#define RI_PLAN_SETDEFAULT_ONUPDATE            7
 
 #define MAX_QUOTED_NAME_LEN  (NAMEDATALEN*2+3)
 #define MAX_QUOTED_REL_NAME_LEN  (MAX_QUOTED_NAME_LEN*2)
@@ -229,9 +226,278 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                                           Relation pk_rel, Relation fk_rel,
                                                           TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                                                          int queryno, bool partgone) pg_attribute_noreturn();
+                                                          bool on_fk, bool partgone) pg_attribute_noreturn();
+static Oid     get_fkey_unique_index(Oid conoid);
 
 
+/*
+ * Checks whether a tuple containing the unique key as extracted from the
+ * tuple provided in 'slot' exists in 'pk_rel'.  The key is extracted using the
+ * constraint's index given in 'riinfo', which is also scanned to check the
+ * existence of the key.
+ *
+ * If 'pk_rel' is a partitioned table, the check is performed on its leaf
+ * partition that would contain the key.
+ *
+ * The provided tuple is either the one being inserted into the referencing
+ * relation ('fk_rel' is non-NULL), or the one being deleted from the
+ * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
+ */
+static bool
+ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
+                                          TupleTableSlot *slot, const RI_ConstraintInfo *riinfo)
+{
+       Oid                     constr_id = riinfo->constraint_id;
+       Oid                     idxoid;
+       Relation        idxrel;
+       Relation        leaf_pk_rel = NULL;
+       int                     num_pk;
+       int                     i;
+       bool            found = false;
+       const Oid  *eq_oprs;
+       Datum           pk_vals[INDEX_MAX_KEYS];
+       char            pk_nulls[INDEX_MAX_KEYS];
+       ScanKeyData skey[INDEX_MAX_KEYS];
+       Snapshot        snap = InvalidSnapshot;
+       bool            pushed_latest_snapshot = false;
+       IndexScanDesc scan;
+       TupleTableSlot *outslot;
+       Oid                     saved_userid;
+       int                     saved_sec_context;
+       AclResult       aclresult;
+
+       /*
+        * Extract the unique key from the provided slot and choose the equality
+        * operators to use when scanning the index below.
+        */
+       if (fk_rel)
+       {
+               ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
+               /* Use PK = FK equality operator. */
+               eq_oprs = riinfo->pf_eq_oprs;
+
+               /*
+                * May need to cast each of the individual values of the foreign key
+                * to the corresponding PK column's type if the equality operator
+                * demands it.
+                */
+               for (i = 0; i < riinfo->nkeys; i++)
+               {
+                       if (pk_nulls[i] != 'n')
+                       {
+                               Oid                     eq_opr = eq_oprs[i];
+                               Oid                     typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+                               RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
+
+                               if (OidIsValid(entry->cast_func_finfo.fn_oid))
+                                       pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
+                                                                                          pk_vals[i],
+                                                                                          Int32GetDatum(-1),   /* typmod */
+                                                                                          BoolGetDatum(false));        /* implicit coercion */
+                       }
+               }
+       }
+       else
+       {
+               ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
+               /* Use PK = PK equality operator. */
+               eq_oprs = riinfo->pp_eq_oprs;
+       }
+
+       /*
+        * Switch to referenced table's owner to perform the below operations as.
+        * This matches what ri_PerformCheck() does.
+        *
+        * Note that as with queries done by ri_PerformCheck(), the way we select
+        * the referenced row below effectively bypasses any RLS policies that may
+        * be present on the referenced table.
+        */
+       GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
+       SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
+                                                  saved_sec_context | SECURITY_LOCAL_USERID_CHANGE);
+
+       /*
+        * Also check that the new user has permissions to look into the schema of
+        * and SELECT from the referenced table.
+        */
+       aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
+                                                                         GetUserId(), ACL_USAGE);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, OBJECT_SCHEMA,
+                                          get_namespace_name(RelationGetNamespace(pk_rel)));
+       aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
+                                                                 ACL_SELECT);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, OBJECT_TABLE,
+                                          RelationGetRelationName(pk_rel));
+
+       /* Make the changes of the current command visible in all cases. */
+       CommandCounterIncrement();
+
+       /*
+        * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like
+        * to see all rows that could be interesting, even those that would not be
+        * visible to the transaction snapshot.  To do so, force-push the latest
+        * snapshot.
+        */
+       if (fk_rel == NULL)
+       {
+               snap = GetLatestSnapshot();
+               PushActiveSnapshot(snap);
+               pushed_latest_snapshot = true;
+       }
+       else
+       {
+               snap = GetTransactionSnapshot();
+               PushActiveSnapshot(snap);
+       }
+
+       /*
+        * Open the constraint index to be scanned.
+        *
+        * If the target table is partitioned, we must look up the leaf partition
+        * and its corresponding unique index to search the keys in.
+        */
+       idxoid = get_fkey_unique_index(constr_id);
+       if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               Oid                     leaf_idxoid;
+               Snapshot        mysnap = InvalidSnapshot;
+
+               /*
+                * XXX the partition descriptor machinery has a hack that assumes that
+                * the queries originating in this module push the latest snapshot in
+                * the transaction-snapshot mode.  If we haven't pushed one already,
+                * do so now.
+                */
+               if (!pushed_latest_snapshot)
+               {
+                       mysnap = GetLatestSnapshot();
+                       PushActiveSnapshot(mysnap);
+               }
+
+               leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
+                                                                                                riinfo->pk_attnums,
+                                                                                                pk_vals, pk_nulls,
+                                                                                                idxoid, RowShareLock,
+                                                                                                &leaf_idxoid);
+
+               /*
+                * XXX done fiddling with the partition descriptor machinery so unset
+                * the active snapshot if we must.
+                */
+               if (mysnap != InvalidSnapshot)
+                       PopActiveSnapshot();
+
+               /*
+                * If no suitable leaf partition exists, neither can the key we're
+                * looking for.
+                */
+               if (leaf_pk_rel == NULL)
+               {
+                       SetUserIdAndSecContext(saved_userid, saved_sec_context);
+                       PopActiveSnapshot();
+                       return false;
+               }
+
+               pk_rel = leaf_pk_rel;
+               idxoid = leaf_idxoid;
+       }
+       idxrel = index_open(idxoid, RowShareLock);
+
+       /* Set up ScanKeys for the index scan. */
+       num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
+       for (i = 0; i < num_pk; i++)
+       {
+               int                     pkattno = i + 1;
+               Oid                     operator = eq_oprs[i];
+               Oid                     opfamily = idxrel->rd_opfamily[i];
+               StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
+               RegProcedure regop = get_opcode(operator);
+
+               /* Initialize the scankey. */
+               ScanKeyInit(&skey[i],
+                                       pkattno,
+                                       strat,
+                                       regop,
+                                       pk_vals[i]);
+
+               skey[i].sk_collation = idxrel->rd_indcollation[i];
+
+               /*
+                * Check for null value.  Should not occur, because callers currently
+                * take care of the cases in which they do occur.
+                */
+               if (pk_nulls[i] == 'n')
+                       skey[i].sk_flags |= SK_ISNULL;
+       }
+
+       scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0);
+       index_rescan(scan, skey, num_pk, NULL, 0);
+
+       /* Look for the tuple, and if found, try to lock it in key share mode. */
+       outslot = table_slot_create(pk_rel, NULL);
+       if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+       {
+               /*
+                * If we fail to lock the tuple for whatever reason, assume it doesn't
+                * exist.
+                */
+               found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
+                                                                  snap,
+                                                                  GetCurrentCommandId(false),
+                                                                  LockTupleKeyShare,
+                                                                  LockWaitBlock, NULL);
+       }
+
+       index_endscan(scan);
+       ExecDropSingleTupleTableSlot(outslot);
+
+       /* Don't release lock until commit. */
+       index_close(idxrel, NoLock);
+
+       /* Close leaf partition relation if any. */
+       if (leaf_pk_rel)
+               table_close(leaf_pk_rel, NoLock);
+
+       /* Restore UID and security context */
+       SetUserIdAndSecContext(saved_userid, saved_sec_context);
+
+       PopActiveSnapshot();
+
+       return found;
+}
+
+/*
+ * get_fkey_unique_index
+ *             Returns the unique index used by a supposedly foreign key constraint
+ *
+ * XXX This is very similar to get_constraint_index; probably they should be
+ * unified.
+ */
+static Oid
+get_fkey_unique_index(Oid conoid)
+{
+       Oid                     result = InvalidOid;
+       HeapTuple       tp;
+
+       tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
+       if (HeapTupleIsValid(tp))
+       {
+               Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
+
+               if (contup->contype == CONSTRAINT_FOREIGN)
+                       result = contup->conindid;
+               ReleaseSysCache(tp);
+       }
+
+       if (!OidIsValid(result))
+               elog(ERROR, "unique index not found for foreign key constraint %u",
+                        conoid);
+
+       return result;
+}
+
 /*
  * RI_FKey_check -
  *
@@ -244,8 +510,6 @@ RI_FKey_check(TriggerData *trigdata)
        Relation        fk_rel;
        Relation        pk_rel;
        TupleTableSlot *newslot;
-       RI_QueryKey qkey;
-       SPIPlanPtr      qplan;
 
        riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
                                                                        trigdata->tg_relation, false);
@@ -325,9 +589,9 @@ RI_FKey_check(TriggerData *trigdata)
 
                                        /*
                                         * MATCH PARTIAL - all non-null columns must match. (not
-                                        * implemented, can be done by modifying the query below
-                                        * to only include non-null columns, or by writing a
-                                        * special version here)
+                                        * implemented, can be done by modifying
+                                        * ri_ReferencedKeyExists() to only include non-null
+                                        * columns.
                                         */
                                        break;
 #endif
@@ -342,74 +606,12 @@ RI_FKey_check(TriggerData *trigdata)
                        break;
        }
 
-       if (SPI_connect() != SPI_OK_CONNECT)
-               elog(ERROR, "SPI_connect failed");
-
-       /* Fetch or prepare a saved plan for the real check */
-       ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
-
-       if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
-       {
-               StringInfoData querybuf;
-               char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
-               char            attname[MAX_QUOTED_NAME_LEN];
-               char            paramname[16];
-               const char *querysep;
-               Oid                     queryoids[RI_MAX_NUMKEYS];
-               const char *pk_only;
-
-               /* ----------
-                * The query string built is
-                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-                *                 FOR KEY SHARE OF x
-                * The type id's for the $ parameters are those of the
-                * corresponding FK attributes.
-                * ----------
-                */
-               initStringInfo(&querybuf);
-               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-                       "" : "ONLY ";
-               quoteRelationName(pkrelname, pk_rel);
-               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-                                                pk_only, pkrelname);
-               querysep = "WHERE";
-               for (int i = 0; i < riinfo->nkeys; i++)
-               {
-                       Oid                     pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-                       Oid                     fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
-
-                       quoteOneName(attname,
-                                                RIAttName(pk_rel, riinfo->pk_attnums[i]));
-                       sprintf(paramname, "$%d", i + 1);
-                       ri_GenerateQual(&querybuf, querysep,
-                                                       attname, pk_type,
-                                                       riinfo->pf_eq_oprs[i],
-                                                       paramname, fk_type);
-                       querysep = "AND";
-                       queryoids[i] = fk_type;
-               }
-               appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-               /* Prepare and save the plan */
-               qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-                                                        &qkey, fk_rel, pk_rel);
-       }
-
-       /*
-        * Now check that foreign key exists in PK table
-        *
-        * XXX detectNewRows must be true when a partitioned table is on the
-        * referenced side.  The reason is that our snapshot must be fresh in
-        * order for the hack in find_inheritance_children() to work.
-        */
-       ri_PerformCheck(riinfo, &qkey, qplan,
-                                       fk_rel, pk_rel,
-                                       NULL, newslot,
-                                       pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
-                                       SPI_OK_SELECT);
-
-       if (SPI_finish() != SPI_OK_FINISH)
-               elog(ERROR, "SPI_finish failed");
+       if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
+               ri_ReportViolation(riinfo,
+                                                  pk_rel, fk_rel,
+                                                  newslot,
+                                                  NULL,
+                                                  true, false);
 
        table_close(pk_rel, RowShareLock);
 
@@ -464,81 +666,10 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
                                  TupleTableSlot *oldslot,
                                  const RI_ConstraintInfo *riinfo)
 {
-       SPIPlanPtr      qplan;
-       RI_QueryKey qkey;
-       bool            result;
-
        /* Only called for non-null rows */
        Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
 
-       if (SPI_connect() != SPI_OK_CONNECT)
-               elog(ERROR, "SPI_connect failed");
-
-       /*
-        * Fetch or prepare a saved plan for checking PK table with values coming
-        * from a PK row
-        */
-       ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
-
-       if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
-       {
-               StringInfoData querybuf;
-               char            pkrelname[MAX_QUOTED_REL_NAME_LEN];
-               char            attname[MAX_QUOTED_NAME_LEN];
-               char            paramname[16];
-               const char *querysep;
-               const char *pk_only;
-               Oid                     queryoids[RI_MAX_NUMKEYS];
-
-               /* ----------
-                * The query string built is
-                *      SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
-                *                 FOR KEY SHARE OF x
-                * The type id's for the $ parameters are those of the
-                * PK attributes themselves.
-                * ----------
-                */
-               initStringInfo(&querybuf);
-               pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
-                       "" : "ONLY ";
-               quoteRelationName(pkrelname, pk_rel);
-               appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
-                                                pk_only, pkrelname);
-               querysep = "WHERE";
-               for (int i = 0; i < riinfo->nkeys; i++)
-               {
-                       Oid                     pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
-
-                       quoteOneName(attname,
-                                                RIAttName(pk_rel, riinfo->pk_attnums[i]));
-                       sprintf(paramname, "$%d", i + 1);
-                       ri_GenerateQual(&querybuf, querysep,
-                                                       attname, pk_type,
-                                                       riinfo->pp_eq_oprs[i],
-                                                       paramname, pk_type);
-                       querysep = "AND";
-                       queryoids[i] = pk_type;
-               }
-               appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
-
-               /* Prepare and save the plan */
-               qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
-                                                        &qkey, fk_rel, pk_rel);
-       }
-
-       /*
-        * We have a plan now. Run it.
-        */
-       result = ri_PerformCheck(riinfo, &qkey, qplan,
-                                                        fk_rel, pk_rel,
-                                                        oldslot, NULL,
-                                                        true,  /* treat like update */
-                                                        SPI_OK_SELECT);
-
-       if (SPI_finish() != SPI_OK_FINISH)
-               elog(ERROR, "SPI_finish failed");
-
-       return result;
+       return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
 }
 
 
@@ -1608,15 +1739,10 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                                         errtableconstraint(fk_rel,
                                                                                NameStr(fake_riinfo.conname))));
 
-               /*
-                * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
-                * query, which isn't true, but will cause it to use
-                * fake_riinfo.fk_attnums as we need.
-                */
                ri_ReportViolation(&fake_riinfo,
                                                   pk_rel, fk_rel,
                                                   slot, tupdesc,
-                                                  RI_PLAN_CHECK_LOOKUPPK, false);
+                                                  true, false);
 
                ExecDropSingleTupleTableSlot(slot);
        }
@@ -1833,7 +1959,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
                        fake_riinfo.pk_attnums[i] = i + 1;
 
                ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
-                                                  slot, tupdesc, 0, true);
+                                                  slot, tupdesc, true, true);
        }
 
        if (SPI_finish() != SPI_OK_FINISH)
@@ -1970,9 +2096,8 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
 {
        /*
         * Inherited constraints with a common ancestor can share ri_query_cache
-        * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
-        * Except in that case, the query processes the other table involved in
-        * the FK constraint (i.e., not the table on which the trigger has been
+        * entries, because each query processes the other table involved in the
+        * FK constraint (i.e., not the table on which the trigger has been
         * fired), and so it will be the same for all members of the inheritance
         * tree.  So we may use the root constraint's OID in the hash key, rather
         * than the constraint's own OID.  This avoids creating duplicate SPI
@@ -1983,13 +2108,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
         * constraint, because partitions can have different column orders,
         * resulting in different pk_attnums[] or fk_attnums[] array contents.)
         *
+        * (Note also that for a standalone or non-inherited constraint,
+        * constraint_root_id is same as constraint_id.)
+        *
         * We assume struct RI_QueryKey contains no padding bytes, else we'd need
         * to use memset to clear them.
         */
-       if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
-               key->constr_id = riinfo->constraint_root_id;
-       else
-               key->constr_id = riinfo->constraint_id;
+       key->constr_id = riinfo->constraint_root_id;
        key->constr_queryno = constr_queryno;
 }
 
@@ -2260,19 +2385,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
                         RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
 {
        SPIPlanPtr      qplan;
-       Relation        query_rel;
+
+       /* There are currently no queries that run on PK table. */
+       Relation        query_rel = fk_rel;
        Oid                     save_userid;
        int                     save_sec_context;
 
-       /*
-        * Use the query type code to determine whether the query is run against
-        * the PK or FK table; we'll do the check as that table's owner
-        */
-       if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
-               query_rel = pk_rel;
-       else
-               query_rel = fk_rel;
-
        /* Switch to proper UID to perform check as */
        GetUserIdAndSecContext(&save_userid, &save_sec_context);
        SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2305,9 +2423,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                                TupleTableSlot *oldslot, TupleTableSlot *newslot,
                                bool detectNewRows, int expect_OK)
 {
-       Relation        query_rel,
-                               source_rel;
-       bool            source_is_pk;
+       /* There are currently no queries that run on PK table. */
+       Relation        query_rel = fk_rel,
+                               source_rel = pk_rel;
        Snapshot        test_snapshot;
        Snapshot        crosscheck_snapshot;
        int                     limit;
@@ -2317,46 +2435,17 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
        Datum           vals[RI_MAX_NUMKEYS * 2];
        char            nulls[RI_MAX_NUMKEYS * 2];
 
-       /*
-        * Use the query type code to determine whether the query is run against
-        * the PK or FK table; we'll do the check as that table's owner
-        */
-       if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
-               query_rel = pk_rel;
-       else
-               query_rel = fk_rel;
-
-       /*
-        * The values for the query are taken from the table on which the trigger
-        * is called - it is normally the other one with respect to query_rel. An
-        * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
-        * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK).  We might eventually
-        * need some less klugy way to determine this.
-        */
-       if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
-       {
-               source_rel = fk_rel;
-               source_is_pk = false;
-       }
-       else
-       {
-               source_rel = pk_rel;
-               source_is_pk = true;
-       }
-
        /* Extract the parameters to be passed into the query */
        if (newslot)
        {
-               ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
-                                                vals, nulls);
+               ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls);
                if (oldslot)
-                       ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
+                       ri_ExtractValues(source_rel, oldslot, riinfo, true,
                                                         vals + riinfo->nkeys, nulls + riinfo->nkeys);
        }
        else
        {
-               ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
-                                                vals, nulls);
+               ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls);
        }
 
        /*
@@ -2420,14 +2509,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
                                 errhint("This is most likely due to a rule having rewritten the query.")));
 
        /* XXX wouldn't it be clearer to do this part at the caller? */
-       if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
-               expect_OK == SPI_OK_SELECT &&
-               (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
+       if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
                ri_ReportViolation(riinfo,
                                                   pk_rel, fk_rel,
                                                   newslot ? newslot : oldslot,
                                                   NULL,
-                                                  qkey->constr_queryno, false);
+                                                  false, false);
 
        return SPI_processed != 0;
 }
@@ -2458,9 +2545,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
 /*
  * Produce an error report
  *
- * If the failed constraint was on insert/update to the FK table,
- * we want the key names and values extracted from there, and the error
- * message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table (on_fk is
+ * true), we want the key names and values extracted from there, and the
+ * error message to look like 'key blah is not present in PK'.
  * Otherwise, the attr names and values come from the PK table and the
  * message looks like 'key blah is still referenced from FK'.
  */
@@ -2468,22 +2555,20 @@ static void
 ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                   Relation pk_rel, Relation fk_rel,
                                   TupleTableSlot *violatorslot, TupleDesc tupdesc,
-                                  int queryno, bool partgone)
+                                  bool on_fk, bool partgone)
 {
        StringInfoData key_names;
        StringInfoData key_values;
-       bool            onfk;
        const int16 *attnums;
        Oid                     rel_oid;
        AclResult       aclresult;
        bool            has_perm = true;
 
        /*
-        * Determine which relation to complain about.  If tupdesc wasn't passed
-        * by caller, assume the violator tuple came from there.
+        * If tupdesc wasn't passed by caller, assume the violator tuple came from
+        * there.
         */
-       onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
-       if (onfk)
+       if (on_fk)
        {
                attnums = riinfo->fk_attnums;
                rel_oid = fk_rel->rd_id;
@@ -2585,7 +2670,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
                                                   key_names.data, key_values.data,
                                                   RelationGetRelationName(fk_rel)),
                                 errtableconstraint(fk_rel, NameStr(riinfo->conname))));
-       else if (onfk)
+       else if (on_fk)
                ereport(ERROR,
                                (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
                                 errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2892,7 +2977,10 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
  * ri_HashCompareOp -
  *
  * See if we know how to compare two values, and create a new hash entry
- * if not.
+ * if not.  The entry contains the FmgrInfo of the equality operator function
+ * and that of the cast function, if one is needed to convert the right
+ * operand (whose type OID has been passed) before passing it to the equality
+ * function.
  */
 static RI_CompareHashEntry *
 ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -2948,8 +3036,16 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
                 * moment since that will never be generated for implicit coercions.
                 */
                op_input_types(eq_opr, &lefttype, &righttype);
-               Assert(lefttype == righttype);
-               if (typeid == lefttype)
+
+               /*
+                * Don't need to cast if the values that will be passed to the
+                * operator will be of expected operand type(s).  The operator can be
+                * cross-type (such as when called by ri_ReferencedKeyExists()), in
+                * which case, we only need the cast if the right operand value
+                * doesn't match the type expected by the operator.
+                */
+               if ((lefttype == righttype && typeid == lefttype) ||
+                       (lefttype != righttype && typeid == righttype))
                        castfunc = InvalidOid;  /* simplest case */
                else
                {
index 708435e95285194fbb7b9f407e2de11485d9ca56..cbe1d996e6b5200832a68b11400767620c4cf873 100644 (file)
@@ -31,6 +31,12 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
                                                                                EState *estate);
 extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
                                                                        PartitionTupleRouting *proute);
+extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
+                                                                                  int key_natts,
+                                                                                  const AttrNumber *key_attnums,
+                                                                                  Datum *key_vals, char *key_nulls,
+                                                                                  Oid root_idxoid, int lockmode,
+                                                                                  Oid *leaf_idxoid);
 
 
 /*
index 873772f188374b0de4eeddcb3850e4346abc2574..216d28679a6e35b1220240c071f5a3ee8e924b9e 100644 (file)
@@ -651,6 +651,14 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
                                                                         const char *relname);
 
+/*
+ * prototypes from functions in nodeLockRows.c
+ */
+extern bool ExecLockTableTuple(Relation relation, ItemPointer tid,
+                                                          TupleTableSlot *slot, Snapshot snapshot,
+                                                          CommandId cid, LockTupleMode lockmode,
+                                                          LockWaitPolicy waitPolicy, bool *epq_needed);
+
 /*
  * prototypes from functions in nodeModifyTable.c
  */
index 5faf80d6ce0e4cd292ba3788dbbd5ef4fd689447..22752cc7429014e9a0406dc7dd08004642af0cea 100644 (file)
@@ -47,12 +47,12 @@ a
 
 step s2ifn2: INSERT INTO fk_noparted VALUES (2);
 step s2c: COMMIT;
+ERROR:  insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
 step s2sfn: SELECT * FROM fk_noparted;
 a
 -
 1
-2
-(2 rows)
+(1 row)
 
 
 starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
index 378507fbc3822be4821eff7a2062526750c59f81..64d27f29c3c859f56042770e0cf1d44e5d12ceb2 100644 (file)
@@ -46,10 +46,7 @@ step s2sfn   { SELECT * FROM fk_noparted; }
 # inserting into referencing tables in transaction-snapshot mode
 # PK table is non-partitioned
 permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned: buggy, because s2's serialization transaction can
-# see the uncommitted row thanks to the latest snapshot taken for
-# partition lookup to work correctly also ends up getting used by the PK index
-# scan
+# PK table is partitioned
 permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
 
 # inserting into referencing tables in up-to-date snapshot mode