For inplace update durability, make heap_update() callers wait.
authorNoah Misch <noah@leadboat.com>
Tue, 24 Sep 2024 22:25:18 +0000 (15:25 -0700)
committerNoah Misch <noah@leadboat.com>
Tue, 24 Sep 2024 22:25:18 +0000 (15:25 -0700)
The previous commit fixed some ways of losing an inplace update.  It
remained possible to lose one when a backend working toward a
heap_update() copied a tuple into memory just before inplace update of
that tuple.  In catalogs eligible for inplace update, use LOCKTAG_TUPLE
to govern admission to the steps of copying an old tuple, modifying it,
and issuing heap_update().  This includes MERGE commands.  To avoid
changing most of the pg_class DDL, don't require LOCKTAG_TUPLE when
holding a relation lock sufficient to exclude inplace updaters.
Back-patch to v12 (all supported versions).  In v13 and v12, "UPDATE
pg_class" or "UPDATE pg_database" can still lose an inplace update.  The
v14+ UPDATE fix needs commit 86dc90056dfdbd9d1b891718d2e5614e3e432f35,
and it wasn't worth reimplementing that fix without such infrastructure.

Reviewed by Nitin Motiani and (in earlier versions) Heikki Linnakangas.

Discussion: https://postgr.es/m/20231027214946.79.nmisch@google.com

20 files changed:
src/backend/access/heap/README.tuplock
src/backend/access/heap/heapam.c
src/backend/access/index/genam.c
src/backend/catalog/aclchk.c
src/backend/catalog/catalog.c
src/backend/commands/dbcommands.c
src/backend/commands/event_trigger.c
src/backend/commands/indexcmds.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/executor/execReplication.c
src/backend/executor/nodeModifyTable.c
src/backend/utils/cache/relcache.c
src/backend/utils/cache/syscache.c
src/include/nodes/execnodes.h
src/include/storage/lockdefs.h
src/include/utils/syscache.h
src/test/isolation/expected/intra-grant-inplace.out
src/test/isolation/specs/eval-plan-qual.spec
src/test/isolation/specs/intra-grant-inplace.spec

index ddb2defd28bb2c79e620d3231d23218d92c0a468..818cd7f980601926ae98293dbcfbb6dd99a14172 100644 (file)
@@ -154,6 +154,48 @@ The following infomask bits are applicable:
 We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
 is set.
 
+Locking to write inplace-updated tables
+---------------------------------------
+
+If IsInplaceUpdateRelation() returns true for a table, the table is a system
+catalog that receives systable_inplace_update_begin() calls.  Preparing a
+heap_update() of these tables follows additional locking rules, to ensure we
+don't lose the effects of an inplace update.  In particular, consider a moment
+when a backend has fetched the old tuple to modify, not yet having called
+heap_update().  Another backend's inplace update starting then can't conclude
+until the heap_update() places its new tuple in a buffer.  We enforce that
+using locktags as follows.  While DDL code is the main audience, the executor
+follows these rules to make e.g. "MERGE INTO pg_class" safer.  Locking rules
+are per-catalog:
+
+  pg_class systable_inplace_update_begin() callers: before the call, acquire a
+  lock on the relation in mode ShareUpdateExclusiveLock or stricter.  If the
+  update targets a row of RELKIND_INDEX (but not RELKIND_PARTITIONED_INDEX),
+  that lock must be on the table.  Locking the index rel is not necessary.
+  (This allows VACUUM to overwrite per-index pg_class while holding a lock on
+  the table alone.) systable_inplace_update_begin() acquires and releases
+  LOCKTAG_TUPLE in InplaceUpdateTupleLock, an alias for ExclusiveLock, on each
+  tuple it overwrites.
+
+  pg_class heap_update() callers: before copying the tuple to modify, take a
+  lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
+  ShareRowExclusiveLock or stricter on the relation.
+
+  SearchSysCacheLocked1() is one convenient way to acquire the tuple lock.
+  Most heap_update() callers already hold a suitable lock on the relation for
+  other reasons and can skip the tuple lock.  If you do acquire the tuple
+  lock, release it immediately after the update.
+
+
+  pg_database: before copying the tuple to modify, all updaters of pg_database
+  rows acquire LOCKTAG_TUPLE.  (Few updaters acquire LOCKTAG_OBJECT on the
+  database OID, so it wasn't worth extending that as a second option.)
+
+Ideally, DDL might want to perform permissions checks before LockTuple(), as
+we do with RangeVarGetRelidExtended() callbacks.  We typically don't bother.
+LOCKTAG_TUPLE acquirers release it after each row, so the potential
+inconvenience is lower.
+
 Reading inplace-updated columns
 -------------------------------
 
index ecc6eaa27c7185e40f8f6e03d7ad94030a0c8e53..da5e656a08de40ccbfdb68d783995a273b73b5fa 100644 (file)
@@ -40,6 +40,8 @@
 #include "access/valid.h"
 #include "access/visibilitymap.h"
 #include "access/xloginsert.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -57,6 +59,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                                                                  Buffer newbuf, HeapTuple oldtup,
                                                                  HeapTuple newtup, HeapTuple old_key_tuple,
                                                                  bool all_visible_cleared, bool new_all_visible_cleared);
+#ifdef USE_ASSERT_CHECKING
+static void check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                                                ItemPointer otid,
+                                                                                                HeapTuple newtup);
+static void check_inplace_rel_lock(HeapTuple oldtup);
+#endif
 static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
                                                                                   Bitmapset *interesting_cols,
                                                                                   Bitmapset *external_cols,
@@ -103,6 +111,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool ke
  * heavyweight lock mode and MultiXactStatus values to use for any particular
  * tuple lock strength.
  *
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
+ *
  * Don't look at lockstatus/updstatus directly!  Use get_mxact_status_for_lock
  * instead.
  */
@@ -3189,6 +3199,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                                (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
                                 errmsg("cannot update tuples during a parallel operation")));
 
+#ifdef USE_ASSERT_CHECKING
+       check_lock_if_inplace_updateable_rel(relation, otid, newtup);
+#endif
+
        /*
         * Fetch the list of attributes to be checked for various operations.
         *
@@ -4053,6 +4067,128 @@ l2:
        return TM_Ok;
 }
 
+#ifdef USE_ASSERT_CHECKING
+/*
+ * Confirm adequate lock held during heap_update(), per rules from
+ * README.tuplock section "Locking to write inplace-updated tables".
+ */
+static void
+check_lock_if_inplace_updateable_rel(Relation relation,
+                                                                        ItemPointer otid,
+                                                                        HeapTuple newtup)
+{
+       /* LOCKTAG_TUPLE acceptable for any catalog */
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+               case DatabaseRelationId:
+                       {
+                               LOCKTAG         tuptag;
+
+                               SET_LOCKTAG_TUPLE(tuptag,
+                                                                 relation->rd_lockInfo.lockRelId.dbId,
+                                                                 relation->rd_lockInfo.lockRelId.relId,
+                                                                 ItemPointerGetBlockNumber(otid),
+                                                                 ItemPointerGetOffsetNumber(otid));
+                               if (LockHeldByMe(&tuptag, InplaceUpdateTupleLock, false))
+                                       return;
+                       }
+                       break;
+               default:
+                       Assert(!IsInplaceUpdateRelation(relation));
+                       return;
+       }
+
+       switch (RelationGetRelid(relation))
+       {
+               case RelationRelationId:
+                       {
+                               /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
+                               Form_pg_class classForm = (Form_pg_class) GETSTRUCT(newtup);
+                               Oid                     relid = classForm->oid;
+                               Oid                     dbid;
+                               LOCKTAG         tag;
+
+                               if (IsSharedRelation(relid))
+                                       dbid = InvalidOid;
+                               else
+                                       dbid = MyDatabaseId;
+
+                               if (classForm->relkind == RELKIND_INDEX)
+                               {
+                                       Relation        irel = index_open(relid, AccessShareLock);
+
+                                       SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+                                       index_close(irel, AccessShareLock);
+                               }
+                               else
+                                       SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+                               if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, false) &&
+                                       !LockHeldByMe(&tag, ShareRowExclusiveLock, true))
+                                       elog(WARNING,
+                                                "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                                                NameStr(classForm->relname),
+                                                relid,
+                                                classForm->relkind,
+                                                ItemPointerGetBlockNumber(otid),
+                                                ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+               case DatabaseRelationId:
+                       {
+                               /* LOCKTAG_TUPLE required */
+                               Form_pg_database dbForm = (Form_pg_database) GETSTRUCT(newtup);
+
+                               elog(WARNING,
+                                        "missing lock on database \"%s\" (OID %u) @ TID (%u,%u)",
+                                        NameStr(dbForm->datname),
+                                        dbForm->oid,
+                                        ItemPointerGetBlockNumber(otid),
+                                        ItemPointerGetOffsetNumber(otid));
+                       }
+                       break;
+       }
+}
+
+/*
+ * Confirm adequate relation lock held, per rules from README.tuplock section
+ * "Locking to write inplace-updated tables".
+ */
+static void
+check_inplace_rel_lock(HeapTuple oldtup)
+{
+       Form_pg_class classForm = (Form_pg_class) GETSTRUCT(oldtup);
+       Oid                     relid = classForm->oid;
+       Oid                     dbid;
+       LOCKTAG         tag;
+
+       if (IsSharedRelation(relid))
+               dbid = InvalidOid;
+       else
+               dbid = MyDatabaseId;
+
+       if (classForm->relkind == RELKIND_INDEX)
+       {
+               Relation        irel = index_open(relid, AccessShareLock);
+
+               SET_LOCKTAG_RELATION(tag, dbid, irel->rd_index->indrelid);
+               index_close(irel, AccessShareLock);
+       }
+       else
+               SET_LOCKTAG_RELATION(tag, dbid, relid);
+
+       if (!LockHeldByMe(&tag, ShareUpdateExclusiveLock, true))
+               elog(WARNING,
+                        "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)",
+                        NameStr(classForm->relname),
+                        relid,
+                        classForm->relkind,
+                        ItemPointerGetBlockNumber(&oldtup->t_self),
+                        ItemPointerGetOffsetNumber(&oldtup->t_self));
+}
+#endif
+
 /*
  * Check if the specified attribute's values are the same.  Subroutine for
  * HeapDetermineColumnsInfo.
@@ -6070,15 +6206,21 @@ heap_inplace_lock(Relation relation,
        TM_Result       result;
        bool            ret;
 
+#ifdef USE_ASSERT_CHECKING
+       if (RelationGetRelid(relation) == RelationRelationId)
+               check_inplace_rel_lock(oldtup_ptr);
+#endif
+
        Assert(BufferIsValid(buffer));
 
+       LockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
        /*----------
         * Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
         *
         * - wait unconditionally
-        * - no tuple locks
+        * - already locked tuple above, since inplace needs that unconditionally
         * - don't recheck header after wait: simpler to defer to next iteration
         * - don't try to continue even if the updater aborts: likewise
         * - no crosscheck
@@ -6162,7 +6304,10 @@ heap_inplace_lock(Relation relation,
         * don't bother optimizing that.
         */
        if (!ret)
+       {
+               UnlockTuple(relation, &oldtup.t_self, InplaceUpdateTupleLock);
                InvalidateCatalogSnapshot();
+       }
        return ret;
 }
 
@@ -6171,6 +6316,8 @@ heap_inplace_lock(Relation relation,
  *
  * The tuple cannot change size, and therefore its header fields and null
  * bitmap (if any) don't change either.
+ *
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
  */
 void
 heap_inplace_update_and_unlock(Relation relation,
@@ -6254,6 +6401,7 @@ heap_inplace_unlock(Relation relation,
                                        HeapTuple oldtup, Buffer buffer)
 {
        LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       UnlockTuple(relation, &oldtup->t_self, InplaceUpdateTupleLock);
 }
 
 #define                FRM_NOOP                                0x0001
index f3f6d4754e9279588ce0da88f236f432c5763cf0..52fde5cc4d4ce6dc4edd0e3b1b2a4cba06b66534 100644 (file)
@@ -765,7 +765,9 @@ systable_endscan_ordered(SysScanDesc sysscan)
  *
  * Overwriting violates both MVCC and transactional safety, so the uses of
  * this function in Postgres are extremely limited.  Nonetheless we find some
- * places to use it.  Standard flow:
+ * places to use it.  See README.tuplock section "Locking to write
+ * inplace-updated tables" and later sections for expectations of readers and
+ * writers of a table that gets inplace updates.  Standard flow:
  *
  * ... [any slow preparation not requiring oldtup] ...
  * systable_inplace_update_begin([...], &tup, &inplace_state);
index d2abc48fd8d1f586fa92d6c1e76e0355c78a1a19..819045203dd1b0cd752ed9078280e7b8df591248 100644 (file)
@@ -75,6 +75,7 @@
 #include "nodes/makefuncs.h"
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
+#include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/aclchk_internal.h"
 #include "utils/builtins.h"
@@ -1848,7 +1849,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                HeapTuple       tuple;
                ListCell   *cell_colprivs;
 
-               tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+               tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relOid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for relation %u", relOid);
                pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
@@ -2060,6 +2061,7 @@ ExecGrant_Relation(InternalGrant *istmt)
                                                                                 values, nulls, replaces);
 
                        CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+                       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                        /* Update initial privileges for extensions */
                        recordExtensionInitPriv(relOid, RelationRelationId, 0, new_acl);
@@ -2072,6 +2074,8 @@ ExecGrant_Relation(InternalGrant *istmt)
 
                        pfree(new_acl);
                }
+               else
+                       UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /*
                 * Handle column-level privileges, if any were specified or implied.
@@ -2185,7 +2189,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, AclMode default_privs,
                Oid                *oldmembers;
                Oid                *newmembers;
 
-               tuple = SearchSysCache1(cacheid, ObjectIdGetDatum(objectid));
+               tuple = SearchSysCacheLocked1(cacheid, ObjectIdGetDatum(objectid));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for %s %u", get_object_class_descr(classid), objectid);
 
@@ -2261,6 +2265,7 @@ ExecGrant_common(InternalGrant *istmt, Oid classid, AclMode default_privs,
                                                                         nulls, replaces);
 
                CatalogTupleUpdate(relation, &newtuple->t_self, newtuple);
+               UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);
 
                /* Update initial privileges for extensions */
                recordExtensionInitPriv(objectid, classid, 0, new_acl);
index d6b07a786550897107ca638fff889203d7428923..cfe8c5104b6535ce882b709cdd93a3735c743729 100644 (file)
@@ -138,6 +138,15 @@ IsCatalogRelationOid(Oid relid)
 /*
  * IsInplaceUpdateRelation
  *             True iff core code performs inplace updates on the relation.
+ *
+ *             This is used for assertions and for making the executor follow the
+ *             locking protocol described at README.tuplock section "Locking to write
+ *             inplace-updated tables".  Extensions may inplace-update other heap
+ *             tables, but concurrent SQL UPDATE on the same table may overwrite
+ *             those modifications.
+ *
+ *             The executor can assume these are not partitions or partitioned and
+ *             have no triggers.
  */
 bool
 IsInplaceUpdateRelation(Relation relation)
index 40bfd09333764b0e4b77ca5259d2d868c0446b2f..aa91a396967c8a80c807023bf6d097aa68fe5d2f 100644 (file)
@@ -1877,6 +1877,7 @@ RenameDatabase(const char *oldname, const char *newname)
 {
        Oid                     db_id;
        HeapTuple       newtup;
+       ItemPointerData otid;
        Relation        rel;
        int                     notherbackends;
        int                     npreparedxacts;
@@ -1948,11 +1949,13 @@ RenameDatabase(const char *oldname, const char *newname)
                                 errdetail_busy_db(notherbackends, npreparedxacts)));
 
        /* rename */
-       newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       newtup = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
        if (!HeapTupleIsValid(newtup))
                elog(ERROR, "cache lookup failed for database %u", db_id);
+       otid = newtup->t_self;
        namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
-       CatalogTupleUpdate(rel, &newtup->t_self, newtup);
+       CatalogTupleUpdate(rel, &otid, newtup);
+       UnlockTuple(rel, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2201,6 +2204,7 @@ movedb(const char *dbname, const char *tblspcname)
                        ereport(ERROR,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not exist", dbname)));
+               LockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
                new_record_repl[Anum_pg_database_dattablespace - 1] = true;
@@ -2209,6 +2213,7 @@ movedb(const char *dbname, const char *tblspcname)
                                                                         new_record,
                                                                         new_record_nulls, new_record_repl);
                CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
+               UnlockTuple(pgdbrel, &oldtuple->t_self, InplaceUpdateTupleLock);
 
                InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2439,6 +2444,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", stmt->dbname)));
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
@@ -2488,6 +2494,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
                                                                 new_record_nulls, new_record_repl);
        CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, dboid, 0);
 
@@ -2537,6 +2544,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
        if (!object_ownercheck(DatabaseRelationId, db_id, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
+       LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
        oldversion = isnull ? NULL : TextDatumGetCString(datum);
@@ -2565,6 +2573,7 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
                bool            nulls[Natts_pg_database] = {0};
                bool            replaces[Natts_pg_database] = {0};
                Datum           values[Natts_pg_database] = {0};
+               HeapTuple       newtuple;
 
                ereport(NOTICE,
                                (errmsg("changing version from %s to %s",
@@ -2573,14 +2582,15 @@ AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
                values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
                replaces[Anum_pg_database_datcollversion - 1] = true;
 
-               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
-                                                                 values, nulls, replaces);
-               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
-               heap_freetuple(tuple);
+               newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                        values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
+               heap_freetuple(newtuple);
        }
        else
                ereport(NOTICE,
                                (errmsg("version has not changed")));
+       UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
 
@@ -2692,6 +2702,8 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
                                        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("permission denied to change owner of database")));
 
+               LockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
+
                repl_repl[Anum_pg_database_datdba - 1] = true;
                repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
 
@@ -2713,6 +2725,7 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 
                newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
                CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
+               UnlockTuple(rel, &tuple->t_self, InplaceUpdateTupleLock);
 
                heap_freetuple(newtuple);
 
index 55baf10c8566eed15c1a0e8fef10554675083998..05a6de68ba3b036c7d8559c4516a042a8209a35b 100644 (file)
@@ -388,6 +388,7 @@ SetDatabaseHasLoginEventTriggers(void)
        /* Set dathasloginevt flag in pg_database */
        Form_pg_database db;
        Relation        pg_db = table_open(DatabaseRelationId, RowExclusiveLock);
+       ItemPointerData otid;
        HeapTuple       tuple;
 
        /*
@@ -399,16 +400,18 @@ SetDatabaseHasLoginEventTriggers(void)
         */
        LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, AccessExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
+       tuple = SearchSysCacheLockedCopy1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for database %u", MyDatabaseId);
+       otid = tuple->t_self;
        db = (Form_pg_database) GETSTRUCT(tuple);
        if (!db->dathasloginevt)
        {
                db->dathasloginevt = true;
-               CatalogTupleUpdate(pg_db, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_db, &otid, tuple);
                CommandCounterIncrement();
        }
+       UnlockTuple(pg_db, &otid, InplaceUpdateTupleLock);
        table_close(pg_db, RowExclusiveLock);
        heap_freetuple(tuple);
 }
index f99c2d2deec91387bd06eeb4bb85ca6c048c981b..ec5f253b28fe32b38d13292ad47aa035c0eda6b0 100644 (file)
@@ -4557,14 +4557,17 @@ update_relispartition(Oid relationId, bool newval)
 {
        HeapTuple       tup;
        Relation        classRel;
+       ItemPointerData otid;
 
        classRel = table_open(RelationRelationId, RowExclusiveLock);
-       tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
+       tup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relationId));
        if (!HeapTupleIsValid(tup))
                elog(ERROR, "cache lookup failed for relation %u", relationId);
+       otid = tup->t_self;
        Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
        ((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
-       CatalogTupleUpdate(classRel, &tup->t_self, tup);
+       CatalogTupleUpdate(classRel, &otid, tup);
+       UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tup);
        table_close(classRel, RowExclusiveLock);
 }
index d27e6cf3451a49bd907a42976310fbf1439140d8..87232e013760b9c0063a0bb9523f7f10bb12a739 100644 (file)
@@ -3593,6 +3593,7 @@ SetRelationTableSpace(Relation rel,
 {
        Relation        pg_class;
        HeapTuple       tuple;
+       ItemPointerData otid;
        Form_pg_class rd_rel;
        Oid                     reloid = RelationGetRelid(rel);
 
@@ -3601,9 +3602,10 @@ SetRelationTableSpace(Relation rel,
        /* Get a modifiable copy of the relation's pg_class row. */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(reloid));
+       tuple = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(reloid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", reloid);
+       otid = tuple->t_self;
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
 
        /* Update the pg_class row. */
@@ -3611,7 +3613,8 @@ SetRelationTableSpace(Relation rel,
                InvalidOid : newTableSpaceId;
        if (RelFileNumberIsValid(newRelFilenumber))
                rd_rel->relfilenode = newRelFilenumber;
-       CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+       CatalogTupleUpdate(pg_class, &otid, tuple);
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
 
        /*
         * Record dependency on tablespace.  This is only required for relations
@@ -4105,6 +4108,7 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
 {
        Relation        targetrelation;
        Relation        relrelation;    /* for RELATION relation */
+       ItemPointerData otid;
        HeapTuple       reltup;
        Form_pg_class relform;
        Oid                     namespaceId;
@@ -4127,7 +4131,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
         */
        relrelation = table_open(RelationRelationId, RowExclusiveLock);
 
-       reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       reltup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(myrelid));
+       otid = reltup->t_self;
        if (!HeapTupleIsValid(reltup))  /* shouldn't happen */
                elog(ERROR, "cache lookup failed for relation %u", myrelid);
        relform = (Form_pg_class) GETSTRUCT(reltup);
@@ -4154,7 +4159,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, bool is_internal, bo
         */
        namestrcpy(&(relform->relname), newrelname);
 
-       CatalogTupleUpdate(relrelation, &reltup->t_self, reltup);
+       CatalogTupleUpdate(relrelation, &otid, reltup);
+       UnlockTuple(relrelation, &otid, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHookArg(RelationRelationId, myrelid, 0,
                                                                 InvalidOid, is_internal);
@@ -15072,7 +15078,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
 
        /* Fetch heap tuple */
        relid = RelationGetRelid(rel);
-       tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+       tuple = SearchSysCacheLocked1(RELOID, ObjectIdGetDatum(relid));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for relation %u", relid);
 
@@ -15176,6 +15182,7 @@ ATExecSetRelOptions(Relation rel, List *defList, AlterTableType operation,
                                                                 repl_val, repl_null, repl_repl);
 
        CatalogTupleUpdate(pgclass, &newtuple->t_self, newtuple);
+       UnlockTuple(pgclass, &tuple->t_self, InplaceUpdateTupleLock);
 
        InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), 0);
 
@@ -17329,7 +17336,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
        ObjectAddress thisobj;
        bool            already_done = false;
 
-       classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relOid));
+       /* no rel lock for relkind=c so use LOCKTAG_TUPLE */
+       classTup = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relOid));
        if (!HeapTupleIsValid(classTup))
                elog(ERROR, "cache lookup failed for relation %u", relOid);
        classForm = (Form_pg_class) GETSTRUCT(classTup);
@@ -17348,6 +17356,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
        already_done = object_address_present(&thisobj, objsMoved);
        if (!already_done && oldNspOid != newNspOid)
        {
+               ItemPointerData otid = classTup->t_self;
+
                /* check for duplicate name (more friendly than unique-index failure) */
                if (get_relname_relid(NameStr(classForm->relname),
                                                          newNspOid) != InvalidOid)
@@ -17360,7 +17370,9 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
                /* classTup is a copy, so OK to scribble on */
                classForm->relnamespace = newNspOid;
 
-               CatalogTupleUpdate(classRel, &classTup->t_self, classTup);
+               CatalogTupleUpdate(classRel, &otid, classTup);
+               UnlockTuple(classRel, &otid, InplaceUpdateTupleLock);
+
 
                /* Update dependency on schema if caller said so */
                if (hasDependEntry &&
@@ -17372,6 +17384,8 @@ AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
                        elog(ERROR, "could not change schema dependency for relation \"%s\"",
                                 NameStr(classForm->relname));
        }
+       else
+               UnlockTuple(classRel, &classTup->t_self, InplaceUpdateTupleLock);
        if (!already_done)
        {
                add_exact_object_address(&thisobj, objsMoved);
index 713cf3e802c10283e4bc7c7045c8a81a2810c0a9..728cdee480ba81f2b7e1ae38c67850c6d92d45df 100644 (file)
@@ -1036,6 +1036,10 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation,
        Relation        resultRel = resultRelInfo->ri_RelationDesc;
        FdwRoutine *fdwroutine;
 
+       /* Expect a fully-formed ResultRelInfo from InitResultRelInfo(). */
+       Assert(resultRelInfo->ri_needLockTagTuple ==
+                  IsInplaceUpdateRelation(resultRel));
+
        switch (resultRel->rd_rel->relkind)
        {
                case RELKIND_RELATION:
@@ -1216,6 +1220,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_NumIndices = 0;
        resultRelInfo->ri_IndexRelationDescs = NULL;
        resultRelInfo->ri_IndexRelationInfo = NULL;
+       resultRelInfo->ri_needLockTagTuple =
+               IsInplaceUpdateRelation(resultRelationDesc);
        /* make a copy so as not to depend on relcache info not changing... */
        resultRelInfo->ri_TrigDesc = CopyTriggerDesc(resultRelationDesc->trigdesc);
        if (resultRelInfo->ri_TrigDesc)
index 1086cbc9624766c739589517731a59ea4a91a45d..54025c9f150903cbd6ec7b4b50222079898d4e3f 100644 (file)
@@ -661,8 +661,12 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
        Relation        rel = resultRelInfo->ri_RelationDesc;
        ItemPointer tid = &(searchslot->tts_tid);
 
-       /* For now we support only tables. */
+       /*
+        * We support only non-system tables, with
+        * check_publication_add_relation() accountable.
+        */
        Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+       Assert(!IsCatalogRelation(rel));
 
        CheckCmdReplicaIdentity(rel, CMD_UPDATE);
 
index 8bf4c80d4a01b99869f951708785959718d1ab94..1161520f76bbd2a7821c4ecb99155c1a61de9b96 100644 (file)
@@ -2324,6 +2324,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
        }
        else
        {
+               ItemPointerData lockedtid;
+
                /*
                 * If we generate a new candidate tuple after EvalPlanQual testing, we
                 * must loop back here to try again.  (We don't need to redo triggers,
@@ -2332,6 +2334,7 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
                 * to do them again.)
                 */
 redo_act:
+               lockedtid = *tupleid;
                result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
                                                           canSetTag, &updateCxt);
 
@@ -2425,6 +2428,14 @@ redo_act:
                                                                ExecInitUpdateProjection(context->mtstate,
                                                                                                                 resultRelInfo);
 
+                                                       if (resultRelInfo->ri_needLockTagTuple)
+                                                       {
+                                                               UnlockTuple(resultRelationDesc,
+                                                                                       &lockedtid, InplaceUpdateTupleLock);
+                                                               LockTuple(resultRelationDesc,
+                                                                                 tupleid, InplaceUpdateTupleLock);
+                                                       }
+
                                                        /* Fetch the most recent version of old tuple. */
                                                        oldSlot = resultRelInfo->ri_oldTupleSlot;
                                                        if (!table_tuple_fetch_row_version(resultRelationDesc,
@@ -2529,6 +2540,14 @@ ExecOnConflictUpdate(ModifyTableContext *context,
        TransactionId xmin;
        bool            isnull;
 
+       /*
+        * Parse analysis should have blocked ON CONFLICT for all system
+        * relations, which includes these.  There's no fundamental obstacle to
+        * supporting this; we'd just need to handle LOCKTAG_TUPLE like the other
+        * ExecUpdate() caller.
+        */
+       Assert(!resultRelInfo->ri_needLockTagTuple);
+
        /* Determine lock mode to use */
        lockmode = ExecUpdateLockMode(context->estate, resultRelInfo);
 
@@ -2854,6 +2873,7 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 {
        ModifyTableState *mtstate = context->mtstate;
        List      **mergeActions = resultRelInfo->ri_MergeActions;
+       ItemPointerData lockedtid;
        List       *actionStates;
        TupleTableSlot *newslot = NULL;
        TupleTableSlot *rslot = NULL;
@@ -2890,14 +2910,32 @@ ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
         * target wholerow junk attr.
         */
        Assert(tupleid != NULL || oldtuple != NULL);
+       ItemPointerSetInvalid(&lockedtid);
        if (oldtuple != NULL)
+       {
+               Assert(!resultRelInfo->ri_needLockTagTuple);
                ExecForceStoreHeapTuple(oldtuple, resultRelInfo->ri_oldTupleSlot,
                                                                false);
-       else if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
-                                                                                       tupleid,
-                                                                                       SnapshotAny,
-                                                                                       resultRelInfo->ri_oldTupleSlot))
-               elog(ERROR, "failed to fetch the target tuple");
+       }
+       else
+       {
+               if (resultRelInfo->ri_needLockTagTuple)
+               {
+                       /*
+                        * This locks even for CMD_DELETE, for CMD_NOTHING, and for tuples
+                        * that don't match mas_whenqual.  MERGE on system catalogs is a
+                        * minor use case, so don't bother optimizing those.
+                        */
+                       LockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                         InplaceUpdateTupleLock);
+                       lockedtid = *tupleid;
+               }
+               if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc,
+                                                                                  tupleid,
+                                                                                  SnapshotAny,
+                                                                                  resultRelInfo->ri_oldTupleSlot))
+                       elog(ERROR, "failed to fetch the target tuple");
+       }
 
        /*
         * Test the join condition.  If it's satisfied, perform a MATCHED action.
@@ -2969,7 +3007,7 @@ lmerge_matched:
                                                                                tupleid, NULL, newslot, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" */
+                                               goto out;       /* "do nothing" */
 
                                        break;          /* concurrent update/delete */
                                }
@@ -2980,11 +3018,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRUpdateTriggers(estate, resultRelInfo,
                                                                                          oldtuple, newslot))
-                                               return NULL;    /* "do nothing" */
+                                               goto out;       /* "do nothing" */
                                }
                                else
                                {
-                                       /* called table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecUpdateAct(context, resultRelInfo, tupleid,
@@ -3003,7 +3041,8 @@ lmerge_matched:
                                        if (updateCxt.crossPartUpdate)
                                        {
                                                mtstate->mt_merge_updated += 1;
-                                               return context->cpUpdateReturningSlot;
+                                               rslot = context->cpUpdateReturningSlot;
+                                               goto out;
                                        }
                                }
 
@@ -3021,7 +3060,7 @@ lmerge_matched:
                                                                                NULL, NULL, &result))
                                {
                                        if (result == TM_Ok)
-                                               return NULL;    /* "do nothing" */
+                                               goto out;       /* "do nothing" */
 
                                        break;          /* concurrent update/delete */
                                }
@@ -3032,11 +3071,11 @@ lmerge_matched:
                                {
                                        if (!ExecIRDeleteTriggers(estate, resultRelInfo,
                                                                                          oldtuple))
-                                               return NULL;    /* "do nothing" */
+                                               goto out;       /* "do nothing" */
                                }
                                else
                                {
-                                       /* called table_tuple_fetch_row_version() above */
+                                       /* checked ri_needLockTagTuple above */
                                        Assert(oldtuple == NULL);
 
                                        result = ExecDeleteAct(context, resultRelInfo, tupleid,
@@ -3118,7 +3157,7 @@ lmerge_matched:
                                 * let caller handle it under NOT MATCHED [BY TARGET] clauses.
                                 */
                                *matched = false;
-                               return NULL;
+                               goto out;
 
                        case TM_Updated:
                                {
@@ -3192,7 +3231,7 @@ lmerge_matched:
                                                                 * more to do.
                                                                 */
                                                                if (TupIsNull(epqslot))
-                                                                       return NULL;
+                                                                       goto out;
 
                                                                /*
                                                                 * If we got a NULL ctid from the subplan, the
@@ -3210,6 +3249,15 @@ lmerge_matched:
                                                                 * we need to switch to the NOT MATCHED BY
                                                                 * SOURCE case.
                                                                 */
+                                                               if (resultRelInfo->ri_needLockTagTuple)
+                                                               {
+                                                                       if (ItemPointerIsValid(&lockedtid))
+                                                                               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                                                                                       InplaceUpdateTupleLock);
+                                                                       LockTuple(resultRelInfo->ri_RelationDesc, &context->tmfd.ctid,
+                                                                                         InplaceUpdateTupleLock);
+                                                                       lockedtid = context->tmfd.ctid;
+                                                               }
                                                                if (!table_tuple_fetch_row_version(resultRelationDesc,
                                                                                                                                   &context->tmfd.ctid,
                                                                                                                                   SnapshotAny,
@@ -3238,7 +3286,7 @@ lmerge_matched:
                                                         * MATCHED [BY TARGET] actions
                                                         */
                                                        *matched = false;
-                                                       return NULL;
+                                                       goto out;
 
                                                case TM_SelfModified:
 
@@ -3266,13 +3314,13 @@ lmerge_matched:
 
                                                        /* This shouldn't happen */
                                                        elog(ERROR, "attempted to update or delete invisible tuple");
-                                                       return NULL;
+                                                       goto out;
 
                                                default:
                                                        /* see table_tuple_lock call in ExecDelete() */
                                                        elog(ERROR, "unexpected table_tuple_lock status: %u",
                                                                 result);
-                                                       return NULL;
+                                                       goto out;
                                        }
                                }
 
@@ -3319,6 +3367,10 @@ lmerge_matched:
        /*
         * Successfully executed an action or no qualifying action was found.
         */
+out:
+       if (ItemPointerIsValid(&lockedtid))
+               UnlockTuple(resultRelInfo->ri_RelationDesc, &lockedtid,
+                                       InplaceUpdateTupleLock);
        return rslot;
 }
 
@@ -3770,6 +3822,7 @@ ExecModifyTable(PlanState *pstate)
        HeapTupleData oldtupdata;
        HeapTuple       oldtuple;
        ItemPointer tupleid;
+       bool            tuplock;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -4082,6 +4135,8 @@ ExecModifyTable(PlanState *pstate)
                                break;
 
                        case CMD_UPDATE:
+                               tuplock = false;
+
                                /* Initialize projection info if first time for this table */
                                if (unlikely(!resultRelInfo->ri_projectNewInfoValid))
                                        ExecInitUpdateProjection(node, resultRelInfo);
@@ -4093,6 +4148,7 @@ ExecModifyTable(PlanState *pstate)
                                oldSlot = resultRelInfo->ri_oldTupleSlot;
                                if (oldtuple != NULL)
                                {
+                                       Assert(!resultRelInfo->ri_needLockTagTuple);
                                        /* Use the wholerow junk attr as the old tuple. */
                                        ExecForceStoreHeapTuple(oldtuple, oldSlot, false);
                                }
@@ -4101,6 +4157,11 @@ ExecModifyTable(PlanState *pstate)
                                        /* Fetch the most recent version of old tuple. */
                                        Relation        relation = resultRelInfo->ri_RelationDesc;
 
+                                       if (resultRelInfo->ri_needLockTagTuple)
+                                       {
+                                               LockTuple(relation, tupleid, InplaceUpdateTupleLock);
+                                               tuplock = true;
+                                       }
                                        if (!table_tuple_fetch_row_version(relation, tupleid,
                                                                                                           SnapshotAny,
                                                                                                           oldSlot))
@@ -4112,6 +4173,9 @@ ExecModifyTable(PlanState *pstate)
                                /* Now apply the update. */
                                slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple,
                                                                  slot, node->canSetTag);
+                               if (tuplock)
+                                       UnlockTuple(resultRelInfo->ri_RelationDesc, tupleid,
+                                                               InplaceUpdateTupleLock);
                                break;
 
                        case CMD_DELETE:
index 5b6b7b809c02fbea2f658cd256e5992658b7a490..c326f687eb4f5ee65a8e19059dc86ce8c816a54c 100644 (file)
@@ -3768,6 +3768,7 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
 {
        RelFileNumber newrelfilenumber;
        Relation        pg_class;
+       ItemPointerData otid;
        HeapTuple       tuple;
        Form_pg_class classform;
        MultiXactId minmulti = InvalidMultiXactId;
@@ -3810,11 +3811,12 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
         */
        pg_class = table_open(RelationRelationId, RowExclusiveLock);
 
-       tuple = SearchSysCacheCopy1(RELOID,
-                                                               ObjectIdGetDatum(RelationGetRelid(relation)));
+       tuple = SearchSysCacheLockedCopy1(RELOID,
+                                                                         ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "could not find tuple for relation %u",
                         RelationGetRelid(relation));
+       otid = tuple->t_self;
        classform = (Form_pg_class) GETSTRUCT(tuple);
 
        /*
@@ -3934,9 +3936,10 @@ RelationSetNewRelfilenumber(Relation relation, char persistence)
                classform->relminmxid = minmulti;
                classform->relpersistence = persistence;
 
-               CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+               CatalogTupleUpdate(pg_class, &otid, tuple);
        }
 
+       UnlockTuple(pg_class, &otid, InplaceUpdateTupleLock);
        heap_freetuple(tuple);
 
        table_close(pg_class, RowExclusiveLock);
index 3e03dfc9910e3d5c316f844638286e4831985f68..50c9440f7923e8a3b473b8fdefe6d2e47f13c385 100644 (file)
 #include "catalog/pg_shseclabel_d.h"
 #include "common/int.h"
 #include "lib/qunique.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
 #include "utils/catcache.h"
+#include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -268,6 +271,98 @@ ReleaseSysCache(HeapTuple tuple)
        ReleaseCatCache(tuple);
 }
 
+/*
+ * SearchSysCacheLocked1
+ *
+ * Combine SearchSysCache1() with acquiring a LOCKTAG_TUPLE at mode
+ * InplaceUpdateTupleLock.  This is a tool for complying with the
+ * README.tuplock section "Locking to write inplace-updated tables".  After
+ * the caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock)
+ * and ReleaseSysCache().
+ *
+ * The returned tuple may be the subject of an uncommitted update, so this
+ * doesn't prevent the "tuple concurrently updated" error.
+ */
+HeapTuple
+SearchSysCacheLocked1(int cacheId,
+                                         Datum key1)
+{
+       ItemPointerData tid;
+       LOCKTAG         tag;
+       Oid                     dboid =
+               SysCache[cacheId]->cc_relisshared ? InvalidOid : MyDatabaseId;
+       Oid                     reloid = cacheinfo[cacheId].reloid;
+
+       /*----------
+        * Since inplace updates may happen just before our LockTuple(), we must
+        * return content acquired after LockTuple() of the TID we return.  If we
+        * just fetched twice instead of looping, the following sequence would
+        * defeat our locking:
+        *
+        * GRANT:   SearchSysCache1() = TID (1,5)
+        * GRANT:   LockTuple(pg_class, (1,5))
+        * [no more inplace update of (1,5) until we release the lock]
+        * CLUSTER: SearchSysCache1() = TID (1,5)
+        * CLUSTER: heap_update() = TID (1,8)
+        * CLUSTER: COMMIT
+        * GRANT:   SearchSysCache1() = TID (1,8)
+        * GRANT:   return (1,8) from SearchSysCacheLocked1()
+        * VACUUM:  SearchSysCache1() = TID (1,8)
+        * VACUUM:  LockTuple(pg_class, (1,8))  # two TIDs now locked for one rel
+        * VACUUM:  inplace update
+        * GRANT:   heap_update() = (1,9)  # lose inplace update
+        *
+        * In the happy case, this takes two fetches, one to determine the TID to
+        * lock and another to get the content and confirm the TID didn't change.
+        *
+        * This is valid even if the row gets updated to a new TID, the old TID
+        * becomes LP_UNUSED, and the row gets updated back to its old TID.  We'd
+        * still hold the right LOCKTAG_TUPLE and a copy of the row captured after
+        * the LOCKTAG_TUPLE.
+        */
+       ItemPointerSetInvalid(&tid);
+       for (;;)
+       {
+               HeapTuple       tuple;
+               LOCKMODE        lockmode = InplaceUpdateTupleLock;
+
+               tuple = SearchSysCache1(cacheId, key1);
+               if (ItemPointerIsValid(&tid))
+               {
+                       if (!HeapTupleIsValid(tuple))
+                       {
+                               LockRelease(&tag, lockmode, false);
+                               return tuple;
+                       }
+                       if (ItemPointerEquals(&tid, &tuple->t_self))
+                               return tuple;
+                       LockRelease(&tag, lockmode, false);
+               }
+               else if (!HeapTupleIsValid(tuple))
+                       return tuple;
+
+               tid = tuple->t_self;
+               ReleaseSysCache(tuple);
+               /* like: LockTuple(rel, &tid, lockmode) */
+               SET_LOCKTAG_TUPLE(tag, dboid, reloid,
+                                                 ItemPointerGetBlockNumber(&tid),
+                                                 ItemPointerGetOffsetNumber(&tid));
+               (void) LockAcquire(&tag, lockmode, false, false);
+
+               /*
+                * If an inplace update just finished, ensure we process the syscache
+                * inval.  XXX this is insufficient: the inplace updater may not yet
+                * have reached AtEOXact_Inval().  See test at inplace-inval.spec.
+                *
+                * If a heap_update() call just released its LOCKTAG_TUPLE, we'll
+                * probably find the old tuple and reach "tuple concurrently updated".
+                * If that heap_update() aborts, our LOCKTAG_TUPLE blocks inplace
+                * updates while our caller works.
+                */
+               AcceptInvalidationMessages();
+       }
+}
+
 /*
  * SearchSysCacheCopy
  *
@@ -294,6 +389,28 @@ SearchSysCacheCopy(int cacheId,
        return newtuple;
 }
 
+/*
+ * SearchSysCacheLockedCopy1
+ *
+ * Meld SearchSysCacheLockedCopy1 with SearchSysCacheCopy().  After the
+ * caller's heap_update(), it should UnlockTuple(InplaceUpdateTupleLock) and
+ * heap_freetuple().
+ */
+HeapTuple
+SearchSysCacheLockedCopy1(int cacheId,
+                                                 Datum key1)
+{
+       HeapTuple       tuple,
+                               newtuple;
+
+       tuple = SearchSysCacheLocked1(cacheId, key1);
+       if (!HeapTupleIsValid(tuple))
+               return tuple;
+       newtuple = heap_copytuple(tuple);
+       ReleaseSysCache(tuple);
+       return newtuple;
+}
+
 /*
  * SearchSysCacheExists
  *
index 88467977f89de09c334b815140a8d28524fc63c6..aab59d681cf5c0fd6908b23b868cd0b018ce2957 100644 (file)
@@ -485,6 +485,9 @@ typedef struct ResultRelInfo
        /* Have the projection and the slots above been initialized? */
        bool            ri_projectNewInfoValid;
 
+       /* updates do LockTuple() before oldtup read; see README.tuplock */
+       bool            ri_needLockTagTuple;
+
        /* triggers to be fired, if any */
        TriggerDesc *ri_TrigDesc;
 
index 934ba84f6a26e21740322801981eddac14cbc09d..810b297edf95b2e7cfdf041694ed51ac97d1a91b 100644 (file)
@@ -47,6 +47,8 @@ typedef int LOCKMODE;
 
 #define MaxLockMode                            8       /* highest standard lock mode */
 
+/* See README.tuplock section "Locking to write inplace-updated tables" */
+#define InplaceUpdateTupleLock ExclusiveLock
 
 /* WAL representation of an AccessExclusiveLock on a table */
 typedef struct xl_standby_lock
index 03a27dd0a83f4134e6638cdc42cf8c7014e1a38b..b541911c8fc27c10cc6bdf7d24645ab554a30881 100644 (file)
@@ -43,9 +43,14 @@ extern HeapTuple SearchSysCache4(int cacheId,
 
 extern void ReleaseSysCache(HeapTuple tuple);
 
+extern HeapTuple SearchSysCacheLocked1(int cacheId,
+                                                                          Datum key1);
+
 /* convenience routines */
 extern HeapTuple SearchSysCacheCopy(int cacheId,
                                                                        Datum key1, Datum key2, Datum key3, Datum key4);
+extern HeapTuple SearchSysCacheLockedCopy1(int cacheId,
+                                                                                  Datum key1);
 extern bool SearchSysCacheExists(int cacheId,
                                                                 Datum key1, Datum key2, Datum key3, Datum key4);
 extern Oid     GetSysCacheOid(int cacheId, AttrNumber oidcol,
index fe26984c0e04700c5013ed85fbe4719dbbf1fb6a..b5fe8b06f76184148bac4efeb9736c0d3da0d5ab 100644 (file)
@@ -100,7 +100,7 @@ f
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
 step c2: COMMIT;
 
-starting permutation: b3 sfu3 b1 grant1 read2 as3 addk2 r3 c1 read2
+starting permutation: b3 sfu3 b1 grant1 read2 addk2 r3 c1 read2
 step b3: BEGIN ISOLATION LEVEL READ COMMITTED;
 step sfu3: 
        SELECT relhasindex FROM pg_class
@@ -124,7 +124,6 @@ relhasindex
 f          
 (1 row)
 
-step as3: LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE;
 step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
 step r3: ROLLBACK;
 step grant1: <... completed>
@@ -155,9 +154,11 @@ step b1: BEGIN;
 step grant1: 
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
  <waiting ...>
-step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c);
-step c2: COMMIT;
+step addk2: ALTER TABLE intra_grant_inplace ADD PRIMARY KEY (c); <waiting ...>
+step addk2: <... completed>
+ERROR:  deadlock detected
 step grant1: <... completed>
+step c2: COMMIT;
 step c1: COMMIT;
 step read2: 
        SELECT relhasindex FROM pg_class
@@ -195,9 +196,8 @@ relhasindex
 f          
 (1 row)
 
-s4: WARNING:  got: tuple concurrently updated
-step revoke4: <... completed>
 step r3: ROLLBACK;
+step revoke4: <... completed>
 
 starting permutation: b1 drop1 b3 sfu3 revoke4 c1 r3
 step b1: BEGIN;
@@ -224,6 +224,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: tuple concurrently deleted
+s4: WARNING:  got: cache lookup failed for relation REDACTED
 step revoke4: <... completed>
 step r3: ROLLBACK;
index 3a74406f4d98a8af6046aa93885f0a142734bf8b..07307e623e47314a5d109608ab3f5f89ae5d661b 100644 (file)
@@ -194,7 +194,7 @@ step simplepartupdate_noroute {
        update parttbl set b = 2 where c = 1 returning *;
 }
 
-# test system class updates
+# test system class LockTuple()
 
 step sys1      {
        UPDATE pg_class SET reltuples = 123 WHERE oid = 'accounts'::regclass;
index d07ed3bb2cc8f9d3b04c14c80364de0f822e3142..2992c85b44ddaf46ba0476a6a2199e3db63c213e 100644 (file)
@@ -14,6 +14,7 @@ teardown
 
 # heap_update()
 session s1
+setup  { SET deadlock_timeout = '100s'; }
 step b1        { BEGIN; }
 step grant1    {
        GRANT SELECT ON intra_grant_inplace TO PUBLIC;
@@ -25,6 +26,7 @@ step c1       { COMMIT; }
 
 # inplace update
 session s2
+setup  { SET deadlock_timeout = '10ms'; }
 step read2     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass;
@@ -48,7 +50,6 @@ step sfu3     {
        SELECT relhasindex FROM pg_class
        WHERE oid = 'intra_grant_inplace'::regclass FOR UPDATE;
 }
-step as3       { LOCK TABLE intra_grant_inplace IN ACCESS SHARE MODE; }
 step r3        { ROLLBACK; }
 
 # Additional heap_update()
@@ -74,8 +75,6 @@ step keyshr5  {
 teardown       { ROLLBACK; }
 
 
-# XXX extant bugs: permutation comments refer to planned future LockTuple()
-
 permutation
        b1
        grant1
@@ -118,7 +117,6 @@ permutation
        b1
        grant1(r3)      # acquire LockTuple(), await sfu3 xmax
        read2
-       as3                     # XXX temporary until patch adds locking to addk2
        addk2(c1)       # block in LockTuple() behind grant1
        r3                      # unblock grant1; addk2 now awaits grant1 xmax
        c1
@@ -128,8 +126,8 @@ permutation
        b2
        sfnku2
        b1
-       grant1(c2)              # acquire LockTuple(), await sfnku2 xmax
-       addk2                   # block in LockTuple() behind grant1 = deadlock
+       grant1(addk2)   # acquire LockTuple(), await sfnku2 xmax
+       addk2(*)                # block in LockTuple() behind grant1 = deadlock
        c2
        c1
        read2
@@ -140,7 +138,7 @@ permutation
        grant1
        b3
        sfu3(c1)        # acquire LockTuple(), await grant1 xmax
-       revoke4(sfu3)   # block in LockTuple() behind sfu3
+       revoke4(r3)     # block in LockTuple() behind sfu3
        c1
        r3                      # revoke4 unlocks old tuple and finds new