Improve concurrency of foreign key locking
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 23 Jan 2013 15:04:59 +0000 (12:04 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 23 Jan 2013 15:04:59 +0000 (12:04 -0300)
This patch introduces two additional lock modes for tuples: "SELECT FOR
KEY SHARE" and "SELECT FOR NO KEY UPDATE".  These don't block each
other, in contrast with already existing "SELECT FOR SHARE" and "SELECT
FOR UPDATE".  UPDATE commands that do not modify the values stored in
the columns that are part of the key of the tuple now grab a SELECT FOR
NO KEY UPDATE lock on the tuple, allowing them to proceed concurrently
with tuple locks of the FOR KEY SHARE variety.

Foreign key triggers now use FOR KEY SHARE instead of FOR SHARE; this
means the concurrency improvement applies to them, which is the whole
point of this patch.

The added tuple lock semantics require some rejiggering of the multixact
module, so that the locking level that each transaction is holding can
be stored alongside its Xid.  Also, multixacts now need to persist
across server restarts and crashes, because they can now represent not
only tuple locks, but also tuple updates.  This means we need more
careful tracking of lifetime of pg_multixact SLRU files; since they now
persist longer, we require more infrastructure to figure out when they
can be removed.  pg_upgrade also needs to be careful to copy
pg_multixact files over from the old server to the new, or at least part
of multixact.c state, depending on the versions of the old and new
servers.

Tuple time qualification rules (HeapTupleSatisfies routines) need to be
careful not to consider tuples with the "is multi" infomask bit set as
being only locked; they might need to look up MultiXact values (i.e.
possibly do pg_multixact I/O) to find out the Xid that updated a tuple,
whereas they previously were assured to only use information readily
available from the tuple header.  This is considered acceptable, because
the extra I/O would involve cases that would previously cause some
commands to block waiting for concurrent transactions to finish.

Another important change is the fact that locking tuples that have
previously been updated causes the future versions to be marked as
locked, too; this is essential for correctness of foreign key checks.
This causes additional WAL-logging, also (there was previously a single
WAL record for a locked tuple; now there are as many as updated copies
of the tuple there exist.)

With all this in place, contention related to tuples being checked by
foreign key rules should be much reduced.

As a bonus, the old behavior that a subtransaction grabbing a stronger
tuple lock than the parent (sub)transaction held on a given tuple and
later aborting caused the weaker lock to be lost, has been fixed.

Many new spec files were added for isolation tester framework, to ensure
overall behavior is sane.  There's probably room for several more tests.

There were several reviewers of this patch; in particular, Noah Misch
and Andres Freund spent considerable time in it.  Original idea for the
patch came from Simon Riggs, after a problem report by Joel Jacobson.
Most code is from me, with contributions from Marti Raudsepp, Alexander
Shulgin, Noah Misch and Andres Freund.

This patch was discussed in several pgsql-hackers threads; the most
important start at the following message-ids:
AANLkTimo9XVcEzfiBR-ut3KVNDkjm2Vxh+t8kAmWjPuv@mail.gmail.com
1290721684-sup-3951@alvh.no-ip.org
1294953201-sup-2099@alvh.no-ip.org
1320343602-sup-2290@alvh.no-ip.org
1339690386-sup-8927@alvh.no-ip.org
4FE5FF020200002500048A3D@gw.wicourts.gov
4FEAB90A0200002500048B7D@gw.wicourts.gov

106 files changed:
contrib/file_fdw/output/file_fdw.source
contrib/pageinspect/heapfuncs.c
contrib/pg_upgrade/controldata.c
contrib/pg_upgrade/pg_upgrade.c
contrib/pg_upgrade/pg_upgrade.h
contrib/pgrowlocks/Makefile
contrib/pgrowlocks/pgrowlocks--1.0--1.1.sql [new file with mode: 0644]
contrib/pgrowlocks/pgrowlocks--1.1.sql [moved from contrib/pgrowlocks/pgrowlocks--1.0.sql with 83% similarity]
contrib/pgrowlocks/pgrowlocks.c
contrib/pgrowlocks/pgrowlocks.control
doc/src/sgml/pgrowlocks.sgml
doc/src/sgml/ref/select.sgml
src/backend/access/common/heaptuple.c
src/backend/access/heap/README.tuplock [new file with mode: 0644]
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/heap/rewriteheap.c
src/backend/access/rmgrdesc/heapdesc.c
src/backend/access/rmgrdesc/mxactdesc.c
src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/README
src/backend/access/transam/multixact.c
src/backend/access/transam/varsup.c
src/backend/access/transam/xlog.c
src/backend/catalog/heap.c
src/backend/catalog/index.c
src/backend/commands/analyze.c
src/backend/commands/cluster.c
src/backend/commands/dbcommands.c
src/backend/commands/sequence.c
src/backend/commands/tablecmds.c
src/backend/commands/trigger.c
src/backend/commands/vacuum.c
src/backend/commands/vacuumlazy.c
src/backend/executor/execMain.c
src/backend/executor/nodeLockRows.c
src/backend/executor/nodeModifyTable.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/plan/initsplan.c
src/backend/optimizer/plan/planner.c
src/backend/parser/analyze.c
src/backend/parser/gram.y
src/backend/postmaster/autovacuum.c
src/backend/rewrite/rewriteHandler.c
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/predicate.c
src/backend/tcop/utility.c
src/backend/utils/adt/ri_triggers.c
src/backend/utils/adt/ruleutils.c
src/backend/utils/cache/relcache.c
src/backend/utils/time/combocid.c
src/backend/utils/time/tqual.c
src/bin/pg_controldata/pg_controldata.c
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/heapam.h
src/include/access/heapam_xlog.h
src/include/access/htup.h
src/include/access/htup_details.h
src/include/access/multixact.h
src/include/access/rewriteheap.h
src/include/catalog/catversion.h
src/include/catalog/pg_class.h
src/include/catalog/pg_control.h
src/include/catalog/pg_database.h
src/include/catalog/pg_proc.h
src/include/commands/cluster.h
src/include/commands/vacuum.h
src/include/executor/executor.h
src/include/nodes/execnodes.h
src/include/nodes/parsenodes.h
src/include/nodes/plannodes.h
src/include/parser/analyze.h
src/include/postgres.h
src/include/storage/lock.h
src/include/utils/builtins.h
src/include/utils/rel.h
src/include/utils/relcache.h
src/include/utils/tqual.h
src/test/isolation/expected/aborted-keyrevoke.out [new file with mode: 0644]
src/test/isolation/expected/aborted-keyrevoke_2.out [new file with mode: 0644]
src/test/isolation/expected/delete-abort-savept-2.out [new file with mode: 0644]
src/test/isolation/expected/delete-abort-savept.out [new file with mode: 0644]
src/test/isolation/expected/fk-contention.out
src/test/isolation/expected/fk-deadlock.out
src/test/isolation/expected/fk-deadlock2.out
src/test/isolation/expected/fk-deadlock2_1.out
src/test/isolation/expected/fk-deadlock2_2.out [new file with mode: 0644]
src/test/isolation/expected/fk-deadlock_1.out
src/test/isolation/expected/fk-deadlock_2.out [new file with mode: 0644]
src/test/isolation/expected/fk-delete-insert.out [new file with mode: 0644]
src/test/isolation/expected/lock-update-delete.out [new file with mode: 0644]
src/test/isolation/expected/lock-update-traversal.out [new file with mode: 0644]
src/test/isolation/expected/multixact-no-deadlock.out [new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/isolationtester.c
src/test/isolation/specs/aborted-keyrevoke.spec [new file with mode: 0644]
src/test/isolation/specs/delete-abort-savept-2.spec [new file with mode: 0644]
src/test/isolation/specs/delete-abort-savept.spec [new file with mode: 0644]
src/test/isolation/specs/fk-deadlock.spec
src/test/isolation/specs/fk-deadlock2.spec
src/test/isolation/specs/lock-update-delete.spec [new file with mode: 0644]
src/test/isolation/specs/lock-update-traversal.spec [new file with mode: 0644]
src/test/isolation/specs/multixact-no-deadlock.spec [new file with mode: 0644]

index 6f906e1fc8c8c6f8adb3ea7a089c18cd3647c1bd..c01f8d804bc50911dc3e09748774ade8dc0576db 100644 (file)
@@ -191,7 +191,7 @@ ERROR:  cannot change foreign table "agg_csv"
 DELETE FROM agg_csv WHERE a = 100;
 ERROR:  cannot change foreign table "agg_csv"
 SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
-ERROR:  SELECT FOR UPDATE/SHARE cannot be used with foreign table "agg_csv"
+ERROR:  SELECT FOR UPDATE/SHARE/KEY UPDATE/KEY SHARE cannot be used with foreign table "agg_csv"
 LINE 1: SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
                                             ^
 -- but this should be ignored
index bbf796ff43514efc1fea5d5f5a246b202ae948c9..6d8f6f1c74064b0b9b47e17d295833e1a5f1b11b 100644 (file)
@@ -163,7 +163,7 @@ heap_page_items(PG_FUNCTION_ARGS)
                        tuphdr = (HeapTupleHeader) PageGetItem(page, id);
 
                        values[4] = UInt32GetDatum(HeapTupleHeaderGetXmin(tuphdr));
-                       values[5] = UInt32GetDatum(HeapTupleHeaderGetXmax(tuphdr));
+                       values[5] = UInt32GetDatum(HeapTupleHeaderGetRawXmax(tuphdr));
                        values[6] = UInt32GetDatum(HeapTupleHeaderGetRawCommandId(tuphdr)); /* shared with xvac */
                        values[7] = PointerGetDatum(&tuphdr->t_ctid);
                        values[8] = UInt32GetDatum(tuphdr->t_infomask2);
index 9218f65abc36bfc176af5f5b0da120d1ee912009..7c80c873153fbac139b37ff3ed04a3b0db6910ec 100644 (file)
@@ -40,6 +40,9 @@ get_control_data(ClusterInfo *cluster, bool live_check)
        bool            got_xid = false;
        bool            got_oid = false;
        bool            got_nextxlogfile = false;
+       bool            got_multi = false;
+       bool            got_mxoff = false;
+       bool            got_oldestmulti = false;
        bool            got_log_id = false;
        bool            got_log_seg = false;
        bool            got_tli = false;
@@ -246,6 +249,39 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                        cluster->controldata.chkpnt_nxtoid = str2uint(p);
                        got_oid = true;
                }
+               else if ((p = strstr(bufin, "Latest checkpoint's NextMultiXactId:")) != NULL)
+               {
+                       p = strchr(p, ':');
+
+                       if (p == NULL || strlen(p) <= 1)
+                               pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
+
+                       p++;                            /* removing ':' char */
+                       cluster->controldata.chkpnt_nxtmulti = str2uint(p);
+                       got_multi = true;
+               }
+               else if ((p = strstr(bufin, "Latest checkpoint's oldestMultiXid:")) != NULL)
+               {
+                       p = strchr(p, ':');
+
+                       if (p == NULL || strlen(p) <= 1)
+                               pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
+
+                       p++;                            /* removing ':' char */
+                       cluster->controldata.chkpnt_oldstMulti = str2uint(p);
+                       got_oldestmulti = true;
+               }
+               else if ((p = strstr(bufin, "Latest checkpoint's NextMultiOffset:")) != NULL)
+               {
+                       p = strchr(p, ':');
+
+                       if (p == NULL || strlen(p) <= 1)
+                               pg_log(PG_FATAL, "%d: controldata retrieval problem\n", __LINE__);
+
+                       p++;                            /* removing ':' char */
+                       cluster->controldata.chkpnt_nxtmxoff = str2uint(p);
+                       got_mxoff = true;
+               }
                else if ((p = strstr(bufin, "Maximum data alignment:")) != NULL)
                {
                        p = strchr(p, ':');
@@ -433,6 +469,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 
        /* verify that we got all the mandatory pg_control data */
        if (!got_xid || !got_oid ||
+               !got_multi || !got_mxoff || !got_oldestmulti ||
                (!live_check && !got_nextxlogfile) ||
                !got_tli ||
                !got_align || !got_blocksz || !got_largesz || !got_walsz ||
@@ -448,6 +485,15 @@ get_control_data(ClusterInfo *cluster, bool live_check)
                if (!got_oid)
                        pg_log(PG_REPORT, "  latest checkpoint next OID\n");
 
+               if (!got_multi)
+                       pg_log(PG_REPORT, "  latest checkpoint next MultiXactId\n");
+
+               if (!got_mxoff)
+                       pg_log(PG_REPORT, "  latest checkpoint next MultiXactOffset\n");
+
+               if (!got_oldestmulti)
+                       pg_log(PG_REPORT, "  latest checkpoint oldest MultiXactId\n");
+
                if (!live_check && !got_nextxlogfile)
                        pg_log(PG_REPORT, "  first WAL segment after reset\n");
 
index 88494b8d6deeb8ce65f71d2301eb8b8f45efa796..a752fe8eda1a260c8332f6765706ad416e920a70 100644 (file)
@@ -382,6 +382,52 @@ copy_clog_xlog_xid(void)
                          new_cluster.pgdata);
        check_ok();
 
+       /*
+        * If both new and old are after the pg_multixact change commit, copy those
+        * files too.  If the old server is before that change and the new server
+        * is after, then we don't copy anything but we need to reset pg_control so
+        * that the new server doesn't attempt to read multis older than the cutoff
+        * value.
+        */
+       if (old_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER &&
+               new_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER)
+       {
+               copy_subdir_files("pg_multixact/offsets");
+               copy_subdir_files("pg_multixact/members");
+               prep_status("Setting next multixact ID and offset for new cluster");
+               /*
+                * we preserve all files and contents, so we must preserve both "next"
+                * counters here and the oldest multi present on system.
+                */
+               exec_prog(UTILITY_LOG_FILE, NULL, true,
+                                 "\"%s/pg_resetxlog\" -O %u -m %u,%u \"%s\"",
+                                 new_cluster.bindir,
+                                 old_cluster.controldata.chkpnt_nxtmxoff,
+                                 old_cluster.controldata.chkpnt_nxtmulti,
+                                 old_cluster.controldata.chkpnt_oldstMulti,
+                                 new_cluster.pgdata);
+               check_ok();
+       }
+       else if (new_cluster.controldata.cat_ver >= MULTIXACT_FORMATCHANGE_CAT_VER)
+       {
+               prep_status("Setting oldest multixact ID on new cluster");
+               /*
+                * We don't preserve files in this case, but it's important that the
+                * oldest multi is set to the latest value used by the old system, so
+                * that multixact.c returns the empty set for multis that might be
+                * present on disk.  We set next multi to the value following that; it
+                * might end up wrapped around (i.e. 0) if the old cluster had
+                * next=MaxMultiXactId, but multixact.c can cope with that just fine.
+                */
+               exec_prog(UTILITY_LOG_FILE, NULL, true,
+                                 "\"%s/pg_resetxlog\" -m %u,%u \"%s\"",
+                                 new_cluster.bindir,
+                                 old_cluster.controldata.chkpnt_nxtmulti + 1,
+                                 old_cluster.controldata.chkpnt_nxtmulti,
+                                 new_cluster.pgdata);
+               check_ok();
+       }
+
        /* now reset the wal archives in the new cluster */
        prep_status("Resetting WAL archives");
        exec_prog(UTILITY_LOG_FILE, NULL, true,
index d5c3fa9e830bd5545d09c6388ee51e40a9b5437b..70b93816679800bdf3b1c22cae9d12281e0c6d66 100644 (file)
@@ -108,6 +108,10 @@ extern char *output_files[];
  */
 #define VISIBILITY_MAP_CRASHSAFE_CAT_VER 201107031
 
+/*
+ * pg_multixact format changed in this catversion:
+ */
+#define MULTIXACT_FORMATCHANGE_CAT_VER 201301231
 
 /*
  * Each relation is represented by a relinfo structure.
@@ -182,6 +186,9 @@ typedef struct
        uint32          chkpnt_tli;
        uint32          chkpnt_nxtxid;
        uint32          chkpnt_nxtoid;
+       uint32          chkpnt_nxtmulti;
+       uint32          chkpnt_nxtmxoff;
+       uint32          chkpnt_oldstMulti;
        uint32          align;
        uint32          blocksz;
        uint32          largesz;
index f56389b0e21c3e5612c361c8ffdf5a8110c14eea..fe8042344f675dd4abc1fee834c2064ab7cfb577 100644 (file)
@@ -4,7 +4,7 @@ MODULE_big      = pgrowlocks
 OBJS           = pgrowlocks.o
 
 EXTENSION = pgrowlocks
-DATA = pgrowlocks--1.0.sql pgrowlocks--unpackaged--1.0.sql
+DATA = pgrowlocks--1.1.sql pgrowlocks--1.0--1.1.sql pgrowlocks--unpackaged--1.0.sql
 
 ifdef USE_PGXS
 PG_CONFIG = pg_config
diff --git a/contrib/pgrowlocks/pgrowlocks--1.0--1.1.sql b/contrib/pgrowlocks/pgrowlocks--1.0--1.1.sql
new file mode 100644 (file)
index 0000000..d98cd80
--- /dev/null
@@ -0,0 +1,17 @@
+/* contrib/pgrowlocks/pgrowlocks--1.0--1.1.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pgrowlocks" to load this file. \quit
+
+ALTER EXTENSION pgrowlocks DROP FUNCTION pgrowlocks(text);
+DROP FUNCTION pgrowlocks(text);
+CREATE FUNCTION pgrowlocks(IN relname text,
+    OUT locked_row TID,                -- row TID
+    OUT locker XID,            -- locking XID
+    OUT multi bool,            -- multi XID?
+    OUT xids xid[],            -- multi XIDs
+    OUT modes text[],          -- multi XID statuses
+    OUT pids INTEGER[])                -- locker's process id
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'pgrowlocks'
+LANGUAGE C STRICT;
similarity index 83%
rename from contrib/pgrowlocks/pgrowlocks--1.0.sql
rename to contrib/pgrowlocks/pgrowlocks--1.1.sql
index a909b7430d8fffa80006c503f892b08a24640c28..29079f49231739fbd1970f900492021ed4df6b3c 100644 (file)
@@ -1,14 +1,14 @@
-/* contrib/pgrowlocks/pgrowlocks--1.0.sql */
+/* contrib/pgrowlocks/pgrowlocks--1.1.sql */
 
 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
 \echo Use "CREATE EXTENSION pgrowlocks" to load this file. \quit
 
 CREATE FUNCTION pgrowlocks(IN relname text,
     OUT locked_row TID,                -- row TID
-    OUT lock_type TEXT,                -- lock type
     OUT locker XID,            -- locking XID
     OUT multi bool,            -- multi XID?
     OUT xids xid[],            -- multi XIDs
+    OUT modes text[],          -- multi XID statuses
     OUT pids INTEGER[])                -- locker's process id
 RETURNS SETOF record
 AS 'MODULE_PATHNAME', 'pgrowlocks'
index 20beed2a300b8e88ce31ef36ad1ef589bf861d3d..43ada57352d2f3bb6055e42b200f0c9b228262fa 100644 (file)
@@ -59,6 +59,13 @@ typedef struct
        int                     ncolumns;
 } MyData;
 
+#define                Atnum_tid               0
+#define                Atnum_xmax              1
+#define                Atnum_ismulti   2
+#define                Atnum_xids              3
+#define                Atnum_modes             4
+#define                Atnum_pids              5
+
 Datum
 pgrowlocks(PG_FUNCTION_ARGS)
 {
@@ -117,79 +124,146 @@ pgrowlocks(PG_FUNCTION_ARGS)
        /* scan the relation */
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
+               HTSU_Result     htsu;
+               TransactionId xmax;
+               uint16          infomask;
+
                /* must hold a buffer lock to call HeapTupleSatisfiesUpdate */
                LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
 
-               if (HeapTupleSatisfiesUpdate(tuple->t_data,
-                                                                        GetCurrentCommandId(false),
-                                                                        scan->rs_cbuf) == HeapTupleBeingUpdated)
+               htsu = HeapTupleSatisfiesUpdate(tuple->t_data,
+                                                                               GetCurrentCommandId(false),
+                                                                               scan->rs_cbuf);
+               xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
+               infomask = tuple->t_data->t_infomask;
+
+               /*
+                * a tuple is locked if HTSU returns BeingUpdated, and if it returns
+                * MayBeUpdated but the Xmax is valid and pointing at us.
+                */
+               if (htsu == HeapTupleBeingUpdated ||
+                       (htsu == HeapTupleMayBeUpdated &&
+                        !(infomask & HEAP_XMAX_INVALID) &&
+                        !(infomask & HEAP_XMAX_IS_MULTI) &&
+                        (xmax == GetCurrentTransactionIdIfAny())))
                {
-
                        char      **values;
-                       int                     i;
 
                        values = (char **) palloc(mydata->ncolumns * sizeof(char *));
 
-                       i = 0;
-                       values[i++] = (char *) DirectFunctionCall1(tidout, PointerGetDatum(&tuple->t_self));
+                       values[Atnum_tid] = (char *) DirectFunctionCall1(tidout,
+                                                                                                                        PointerGetDatum(&tuple->t_self));
 
-                       if (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)
-                               values[i++] = pstrdup("Shared");
-                       else
-                               values[i++] = pstrdup("Exclusive");
-                       values[i] = palloc(NCHARS * sizeof(char));
-                       snprintf(values[i++], NCHARS, "%d", HeapTupleHeaderGetXmax(tuple->t_data));
-                       if (tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI)
+                       values[Atnum_xmax] = palloc(NCHARS * sizeof(char));
+                       snprintf(values[Atnum_xmax], NCHARS, "%d", xmax);
+                       if (infomask & HEAP_XMAX_IS_MULTI)
                        {
-                               TransactionId *xids;
-                               int                     nxids;
-                               int                     j;
-                               int                     isValidXid = 0;         /* any valid xid ever exists? */
-
-                               values[i++] = pstrdup("true");
-                               nxids = GetMultiXactIdMembers(HeapTupleHeaderGetXmax(tuple->t_data), &xids);
-                               if (nxids == -1)
+                               MultiXactMember *members;
+                               int                     nmembers;
+                               bool            first = true;
+                               bool            allow_old;
+
+                               values[Atnum_ismulti] = pstrdup("true");
+
+                               allow_old = !(infomask & HEAP_LOCK_MASK) &&
+                                                        (infomask & HEAP_XMAX_LOCK_ONLY);
+                               nmembers = GetMultiXactIdMembers(xmax, &members, allow_old);
+                               if (nmembers == -1)
                                {
-                                       elog(ERROR, "GetMultiXactIdMembers returns error");
+                                       values[Atnum_xids] = "{0}";
+                                       values[Atnum_modes] = "{transient upgrade status}";
+                                       values[Atnum_pids] = "{0}";
                                }
+                               else
+                               {
+                                       int                     j;
 
-                               values[i] = palloc(NCHARS * nxids);
-                               values[i + 1] = palloc(NCHARS * nxids);
-                               strcpy(values[i], "{");
-                               strcpy(values[i + 1], "{");
+                                       values[Atnum_xids] = palloc(NCHARS * nmembers);
+                                       values[Atnum_modes] = palloc(NCHARS * nmembers);
+                                       values[Atnum_pids] = palloc(NCHARS * nmembers);
 
-                               for (j = 0; j < nxids; j++)
-                               {
-                                       char            buf[NCHARS];
+                                       strcpy(values[Atnum_xids], "{");
+                                       strcpy(values[Atnum_modes], "{");
+                                       strcpy(values[Atnum_pids], "{");
 
-                                       if (TransactionIdIsInProgress(xids[j]))
+                                       for (j = 0; j < nmembers; j++)
                                        {
-                                               if (isValidXid)
+                                               char            buf[NCHARS];
+
+                                               if (!first)
                                                {
-                                                       strcat(values[i], ",");
-                                                       strcat(values[i + 1], ",");
+                                                       strcat(values[Atnum_xids], ",");
+                                                       strcat(values[Atnum_modes], ",");
+                                                       strcat(values[Atnum_pids], ",");
                                                }
-                                               snprintf(buf, NCHARS, "%d", xids[j]);
-                                               strcat(values[i], buf);
-                                               snprintf(buf, NCHARS, "%d", BackendXidGetPid(xids[j]));
-                                               strcat(values[i + 1], buf);
+                                               snprintf(buf, NCHARS, "%d", members[j].xid);
+                                               strcat(values[Atnum_xids], buf);
+                                               switch (members[j].status)
+                                               {
+                                                       case MultiXactStatusUpdate:
+                                                               snprintf(buf, NCHARS, "Update");
+                                                               break;
+                                                       case MultiXactStatusNoKeyUpdate:
+                                                               snprintf(buf, NCHARS, "No Key Update");
+                                                               break;
+                                                       case MultiXactStatusForUpdate:
+                                                               snprintf(buf, NCHARS, "For Update");
+                                                               break;
+                                                       case MultiXactStatusForNoKeyUpdate:
+                                                               snprintf(buf, NCHARS, "For No Key Update");
+                                                               break;
+                                                       case MultiXactStatusForShare:
+                                                               snprintf(buf, NCHARS, "Share");
+                                                               break;
+                                                       case MultiXactStatusForKeyShare:
+                                                               snprintf(buf, NCHARS, "Key Share");
+                                                               break;
+                                               }
+                                               strcat(values[Atnum_modes], buf);
+                                               snprintf(buf, NCHARS, "%d",
+                                                                BackendXidGetPid(members[j].xid));
+                                               strcat(values[Atnum_pids], buf);
 
-                                               isValidXid = 1;
+                                               first = false;
                                        }
-                               }
 
-                               strcat(values[i], "}");
-                               strcat(values[i + 1], "}");
-                               i++;
+                                       strcat(values[Atnum_xids], "}");
+                                       strcat(values[Atnum_modes], "}");
+                                       strcat(values[Atnum_pids], "}");
+                               }
                        }
                        else
                        {
-                               values[i++] = pstrdup("false");
-                               values[i] = palloc(NCHARS * sizeof(char));
-                               snprintf(values[i++], NCHARS, "{%d}", HeapTupleHeaderGetXmax(tuple->t_data));
+                               values[Atnum_ismulti] = pstrdup("false");
+
+                               values[Atnum_xids] = palloc(NCHARS * sizeof(char));
+                               snprintf(values[Atnum_xids], NCHARS, "{%d}", xmax);
+
+                               values[Atnum_modes] = palloc(NCHARS);
+                               if (infomask & HEAP_XMAX_LOCK_ONLY)
+                               {
+                                       if (HEAP_XMAX_IS_SHR_LOCKED(infomask))
+                                               snprintf(values[Atnum_modes], NCHARS, "{For Share}");
+                                       else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                                               snprintf(values[Atnum_modes], NCHARS, "{For Key Share}");
+                                       else if (HEAP_XMAX_IS_EXCL_LOCKED(infomask))
+                                               snprintf(values[Atnum_modes], NCHARS, "{For Update}");
+                                       else
+                                               /* neither keyshare nor exclusive bit it set */
+                                               snprintf(values[Atnum_modes], NCHARS,
+                                                                "{transient upgrade status}");
+                               }
+                               else
+                               {
+                                       if (tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED)
+                                               snprintf(values[Atnum_modes], NCHARS, "{Key Update}");
+                                       else
+                                               snprintf(values[Atnum_modes], NCHARS, "{Update}");
+                               }
 
-                               values[i] = palloc(NCHARS * sizeof(char));
-                               snprintf(values[i++], NCHARS, "{%d}", BackendXidGetPid(HeapTupleHeaderGetXmax(tuple->t_data)));
+                               values[Atnum_pids] = palloc(NCHARS * sizeof(char));
+                               snprintf(values[Atnum_pids], NCHARS, "{%d}",
+                                                BackendXidGetPid(xmax));
                        }
 
                        LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
@@ -200,10 +274,10 @@ pgrowlocks(PG_FUNCTION_ARGS)
                        /* make the tuple into a datum */
                        result = HeapTupleGetDatum(tuple);
 
-                       /* Clean up */
-                       for (i = 0; i < mydata->ncolumns; i++)
-                               pfree(values[i]);
-                       pfree(values);
+                       /*
+                        * no need to pfree what we allocated; it's on a short-lived memory
+                        * context anyway
+                        */
 
                        SRF_RETURN_NEXT(funcctx, result);
                }
index a6ba16451573e12fc67915bb4a23aa68e0179884..dfa587d76180d829e308111489b0097b58e96e85 100644 (file)
@@ -1,5 +1,5 @@
 # pgrowlocks extension
 comment = 'show row-level locking information'
-default_version = '1.0'
+default_version = '1.1'
 module_pathname = '$libdir/pgrowlocks'
 relocatable = true
index 390fa236d31937cc85a3d275ef0f43d4c591470d..c7714d88774f8eeafc82aec761b2de08c3829362 100644 (file)
@@ -43,12 +43,6 @@ pgrowlocks(text) returns setof record
       <entry><type>tid</type></entry>
       <entry>Tuple ID (TID) of locked row</entry>
      </row>
-     <row>
-      <entry><structfield>lock_type</structfield></entry>
-      <entry><type>text</type></entry>
-      <entry><literal>Shared</> for shared lock, or
-             <literal>Exclusive</> for exclusive lock</entry>
-     </row>
      <row>
       <entry><structfield>locker</structfield></entry>
       <entry><type>xid</type></entry>
@@ -64,6 +58,15 @@ pgrowlocks(text) returns setof record
       <entry><type>xid[]</type></entry>
       <entry>Transaction IDs of lockers (more than one if multitransaction)</entry>
      </row>
+     <row>
+      <entry><structfield>lock_type</structfield></entry>
+      <entry><type>text[]</type></entry>
+      <entry>Lock mode of lockers (more than one if multitransaction),
+       an array of <literal>Key Share</>, <literal>Share</>,
+       <literal>For No Key Update</>, <literal>No Key Update</>,
+       <literal>For Update</>, <literal>Update</>.</entry>
+     </row>
+
      <row>
       <entry><structfield>pids</structfield></entry>
       <entry><type>integer[]</type></entry>
index 9963780c3139ccf4c3b763d85b952325833fafe9..26d511fad8c5b8d02bda618006ce2606036db7c7 100644 (file)
@@ -45,7 +45,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="parameter">expression</replac
     [ LIMIT { <replaceable class="parameter">count</replaceable> | ALL } ]
     [ OFFSET <replaceable class="parameter">start</replaceable> [ ROW | ROWS ] ]
     [ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] { ROW | ROWS } ONLY ]
-    [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ]
+    [ FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ]
 
 <phrase>where <replaceable class="parameter">from_item</replaceable> can be one of:</phrase>
 
@@ -178,7 +178,8 @@ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]
 
     <listitem>
      <para>
-      If <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal>
+      If <literal>FOR UPDATE</>, <literal>FOR NO KEY UPDATE</literal>, <literal>FOR SHARE</literal>
+      or <literal>FOR KEY SHARE</literal>
       is specified, the
       <command>SELECT</command> statement locks the selected rows
       against concurrent updates.  (See <xref linkend="sql-for-update-share"
@@ -190,8 +191,9 @@ TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ]
 
   <para>
    You must have <literal>SELECT</literal> privilege on each column used
-   in a <command>SELECT</> command.  The use of <literal>FOR UPDATE</literal>
-   or <literal>FOR SHARE</literal> requires
+   in a <command>SELECT</> command.  The use of <literal>FOR NO KEY UPDATE</>,
+   <literal>FOR UPDATE</literal>,
+   <literal>FOR SHARE</literal> or <literal>FOR KEY SHARE</literal> requires
    <literal>UPDATE</literal> privilege as well (for at least one column
    of each table so selected).
   </para>
@@ -873,8 +875,8 @@ SELECT DISTINCT ON (location) location, time, report
 <replaceable class="parameter">select_statement</replaceable> UNION [ ALL | DISTINCT ] <replaceable class="parameter">select_statement</replaceable>
 </synopsis><replaceable class="parameter">select_statement</replaceable> is
     any <command>SELECT</command> statement without an <literal>ORDER
-    BY</>, <literal>LIMIT</>, <literal>FOR UPDATE</literal>, or
-    <literal>FOR SHARE</literal> clause.
+    BY</>, <literal>LIMIT</>, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</literal>,
+    <literal>FOR SHARE</literal>, or <literal>FOR KEY SHARE</literal> clause.
     (<literal>ORDER BY</> and <literal>LIMIT</> can be attached to a
     subexpression if it is enclosed in parentheses.  Without
     parentheses, these clauses will be taken to apply to the result of
@@ -910,7 +912,8 @@ SELECT DISTINCT ON (location) location, time, report
    </para>
 
    <para>
-    Currently, <literal>FOR UPDATE</> and <literal>FOR SHARE</> cannot be
+    Currently, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</>, <literal>FOR SHARE</> and
+    <literal>FOR KEY SHARE</> cannot be
     specified either for a <literal>UNION</> result or for any input of a
     <literal>UNION</>.
    </para>
@@ -925,8 +928,8 @@ SELECT DISTINCT ON (location) location, time, report
 <replaceable class="parameter">select_statement</replaceable> INTERSECT [ ALL | DISTINCT ] <replaceable class="parameter">select_statement</replaceable>
 </synopsis><replaceable class="parameter">select_statement</replaceable> is
     any <command>SELECT</command> statement without an <literal>ORDER
-    BY</>, <literal>LIMIT</>, <literal>FOR UPDATE</literal>, or
-    <literal>FOR SHARE</literal> clause.
+    BY</>, <literal>LIMIT</>, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</literal>,
+    <literal>FOR SHARE</literal>, or <literal>FOR KEY SHARE</> clause.
    </para>
 
    <para>
@@ -957,7 +960,8 @@ SELECT DISTINCT ON (location) location, time, report
    </para>
 
    <para>
-    Currently, <literal>FOR UPDATE</> and <literal>FOR SHARE</> cannot be
+    Currently, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</>, <literal>FOR SHARE</> and
+    <literal>FOR KEY SHARE</> cannot be
     specified either for an <literal>INTERSECT</> result or for any input of
     an <literal>INTERSECT</>.
    </para>
@@ -972,8 +976,8 @@ SELECT DISTINCT ON (location) location, time, report
 <replaceable class="parameter">select_statement</replaceable> EXCEPT [ ALL | DISTINCT ] <replaceable class="parameter">select_statement</replaceable>
 </synopsis><replaceable class="parameter">select_statement</replaceable> is
     any <command>SELECT</command> statement without an <literal>ORDER
-    BY</>, <literal>LIMIT</>, <literal>FOR UPDATE</literal>, or
-    <literal>FOR SHARE</literal> clause.
+    BY</>, <literal>LIMIT</>, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</literal>,
+    <literal>FOR SHARE</literal>, or <literal>FOR KEY SHARE</> clause.
    </para>
 
    <para>
@@ -1000,7 +1004,8 @@ SELECT DISTINCT ON (location) location, time, report
    </para>
 
    <para>
-    Currently, <literal>FOR UPDATE</> and <literal>FOR SHARE</> cannot be
+    Currently, <literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</>, <literal>FOR SHARE</> and
+    <literal>FOR KEY SHARE</> cannot be
     specified either for an <literal>EXCEPT</> result or for any input of
     an <literal>EXCEPT</>.
    </para>
@@ -1185,7 +1190,14 @@ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] {
   </refsect2>
 
   <refsect2 id="SQL-FOR-UPDATE-SHARE">
-   <title id="sql-for-update-share-title"><literal>FOR UPDATE</literal>/<literal>FOR SHARE</literal> Clause</title>
+   <title id="sql-for-update-share-title"><literal>FOR UPDATE</>, <literal>FOR NO KEY UPDATE</>/<literal>FOR SHARE</>/<literal>FOR KEY SHARE</> Clauses</title>
+
+   <para>
+    <literal>FOR UPDATE</>, <literal>FOR NO KEY UPDATE</>, <literal>FOR SHARE</>
+    and <literal>FOR KEY SHARE</>
+    are <firstterm>locking clauses</>; they affect how <literal>SELECT</>
+    locks rows as they are obtained from the table.
+   </para>
 
    <para>
     The <literal>FOR UPDATE</literal> clause has this form:
@@ -1194,6 +1206,13 @@ FOR UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...]
 </synopsis>
    </para>
 
+   <para>
+    The <literal>FOR NO KEY UPDATE</literal> clause has this form:
+<synopsis>
+FOR NO KEY UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ]
+</synopsis>
+   </para>
+
    <para>
     The closely related <literal>FOR SHARE</literal> clause has this form:
 <synopsis>
@@ -1201,14 +1220,31 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
 </synopsis>
    </para>
 
+   <para>
+    Similarly, the <literal>FOR KEY SHARE</> clause has this form:
+<synopsis>
+FOR KEY SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ]
+</synopsis>
+   </para>
+
    <para>
     <literal>FOR UPDATE</literal> causes the rows retrieved by the
     <command>SELECT</command> statement to be locked as though for
     update.  This prevents them from being modified or deleted by
     other transactions until the current transaction ends.  That is,
     other transactions that attempt <command>UPDATE</command>,
-    <command>DELETE</command>, or <command>SELECT FOR UPDATE</command>
+    <command>DELETE</command>,
+    <command>SELECT FOR UPDATE</command>,
+    <command>SELECT FOR SHARE</command> or
+    <command>SELECT FOR KEY SHARE</command>
     of these rows will be blocked until the current transaction ends.
+    The <literal>FOR UPDATE</> lock mode
+    is also acquired by any <command>DELETE</> on a row, and also by an
+    <command>UPDATE</> that modifies the values on certain columns.  Currently,
+    the set of columns considered for the <command>UPDATE</> case are those that
+    have an unique index on them that can be used in a foreign key (so partial
+    indexes and expressional indexes are not considered), but this may change
+    in the future.
     Also, if an <command>UPDATE</command>, <command>DELETE</command>,
     or <command>SELECT FOR UPDATE</command> from another transaction
     has already locked a selected row or rows, <command>SELECT FOR
@@ -1220,13 +1256,33 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
     linkend="mvcc">.
    </para>
 
+   <para>
+    <literal>FOR NO KEY UPDATE</> behaves similarly, except that the lock
+    acquired is weaker: this lock will not block
+    <literal>SELECT FOR KEY SHARE</> commands that attempt to acquire
+    a lock on the same rows.
+   </para>
+
    <para>
     <literal>FOR SHARE</literal> behaves similarly, except that it
     acquires a shared rather than exclusive lock on each retrieved
     row.  A shared lock blocks other transactions from performing
     <command>UPDATE</command>, <command>DELETE</command>, or <command>SELECT
     FOR UPDATE</command> on these rows, but it does not prevent them
-    from performing <command>SELECT FOR SHARE</command>.
+    from performing <command>SELECT FOR SHARE</command> or
+    <command>SELECT FOR KEY SHARE</command>.
+   </para>
+
+   <para>
+    <literal>FOR KEY SHARE</> behaves similarly to <literal>FOR SHARE</literal>,
+    except that the lock
+    is weaker: <literal>SELECT FOR UPDATE</> is blocked, but
+    not <literal>SELECT FOR NO KEY UPDATE</>.  A key-shared
+    lock blocks other transactions from performing <command>DELETE</command>
+    or any <command>UPDATE</command> that changes the key values, but not
+    other <command>UPDATE</>, and neither it does prevent
+    <command>SELECT FOR UPDATE</>, <command>SELECT FOR SHARE</>, or
+    <command>SELECT FOR KEY SHARE</>.
    </para>
 
    <para>
@@ -1243,41 +1299,39 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
    </para>
 
    <para>
-    If specific tables are named in <literal>FOR UPDATE</literal>
-    or <literal>FOR SHARE</literal>,
+    If specific tables are named in a locking clause,
     then only rows coming from those tables are locked; any other
     tables used in the <command>SELECT</command> are simply read as
-    usual.  A <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal>
+    usual.  A locking
     clause without a table list affects all tables used in the statement.
-    If <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal> is
+    If a locking clause is
     applied to a view or sub-query, it affects all tables used in
     the view or sub-query.
-    However, <literal>FOR UPDATE</literal>/<literal>FOR SHARE</literal>
+    However, these clauses
     do not apply to <literal>WITH</> queries referenced by the primary query.
     If you want row locking to occur within a <literal>WITH</> query, specify
-    <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal> within the
-    <literal>WITH</> query.
+    a locking clause within the <literal>WITH</> query.
    </para>
 
    <para>
-    Multiple <literal>FOR UPDATE</literal> and <literal>FOR SHARE</literal>
+    Multiple locking
     clauses can be written if it is necessary to specify different locking
     behavior for different tables.  If the same table is mentioned (or
-    implicitly affected) by both <literal>FOR UPDATE</literal> and
-    <literal>FOR SHARE</literal> clauses, then it is processed as
-    <literal>FOR UPDATE</literal>.  Similarly, a table is processed
+    implicitly affected) by more than one locking clause,
+    then it is processed as if it was only specified by the strongest one.
+    Similarly, a table is processed
     as <literal>NOWAIT</> if that is specified in any of the clauses
     affecting it.
    </para>
 
    <para>
-    <literal>FOR UPDATE</literal> and <literal>FOR SHARE</literal> cannot be
+    The locking clauses cannot be
     used in contexts where returned rows cannot be clearly identified with
     individual table rows; for example they cannot be used with aggregation.
    </para>
 
    <para>
-    When <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal>
+    When a locking clause
     appears at the top level of a <command>SELECT</> query, the rows that
     are locked are exactly those that are returned by the query; in the
     case of a join query, the rows locked are those that contribute to
@@ -1288,13 +1342,13 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
     <literal>LIMIT</> is used, locking stops
     once enough rows have been returned to satisfy the limit (but note that
     rows skipped over by <literal>OFFSET</> will get locked).  Similarly,
-    if <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal>
+    if a locking clause
     is used in a cursor's query, only rows actually fetched or stepped past
     by the cursor will be locked.
    </para>
 
    <para>
-    When <literal>FOR UPDATE</literal> or <literal>FOR SHARE</literal>
+    When a locking clause
     appears in a sub-<command>SELECT</>, the rows locked are those
     returned to the outer query by the sub-query.  This might involve
     fewer rows than inspection of the sub-query alone would suggest,
@@ -1307,11 +1361,9 @@ SELECT * FROM (SELECT * FROM mytable FOR UPDATE) ss WHERE col1 = 5;
     condition is not textually within the sub-query.
    </para>
 
-  <caution>
-   <para>
-    Avoid locking a row and then modifying it within a later savepoint or
-    <application>PL/pgSQL</application> exception block.  A subsequent
-    rollback would cause the lock to be lost.  For example:
+  <para>
+   Previous releases failed to preserve a lock which is upgraded by a later
+   savepoint.  For example, this code:
 <programlisting>
 BEGIN;
 SELECT * FROM mytable WHERE key = 1 FOR UPDATE;
@@ -1319,23 +1371,15 @@ SAVEPOINT s;
 UPDATE mytable SET ... WHERE key = 1;
 ROLLBACK TO s;
 </programlisting>
-    After the <command>ROLLBACK</>, the row is effectively unlocked, rather
-    than returned to its pre-savepoint state of being locked but not modified.
-    This hazard occurs if a row locked in the current transaction is updated
-    or deleted, or if a shared lock is upgraded to exclusive: in all these
-    cases, the former lock state is forgotten.  If the transaction is then
-    rolled back to a state between the original locking command and the
-    subsequent change, the row will appear not to be locked at all.  This is
-    an implementation deficiency which will be addressed in a future release
-    of <productname>PostgreSQL</productname>.
-   </para>
-  </caution>
+   would fail to preserve the <literal>FOR UPDATE</> lock after the
+   <command>ROLLBACK</>.  This has been fixed in release 9.2.
+  </para>
 
   <caution>
    <para>
     It is possible for a <command>SELECT</> command running at the <literal>READ
     COMMITTED</literal> transaction isolation level and using <literal>ORDER
-    BY</literal> and <literal>FOR UPDATE/SHARE</literal> to return rows out of
+    BY</literal> and a locking clause to return rows out of
     order.  This is because <literal>ORDER BY</> is applied first.
     The command sorts the result, but might then block trying to obtain a lock
     on one or more of the rows.  Once the <literal>SELECT</> unblocks, some
@@ -1765,14 +1809,16 @@ SELECT distributors.* WHERE distributors.name = 'Westward';
   </refsect2>
 
   <refsect2>
-   <title><literal>FOR UPDATE</> and <literal>FOR SHARE</></title>
+   <title><literal>FOR NO KEY UPDATE</>, <literal>FOR UPDATE</>, <literal>FOR SHARE</>, <literal>FOR KEY SHARE</></title>
 
    <para>
     Although <literal>FOR UPDATE</> appears in the SQL standard, the
     standard allows it only as an option of <command>DECLARE CURSOR</>.
     <productname>PostgreSQL</productname> allows it in any <command>SELECT</>
     query as well as in sub-<command>SELECT</>s, but this is an extension.
-    The <literal>FOR SHARE</> variant, and the <literal>NOWAIT</> option,
+    The <literal>FOR NO KEY UPDATE</>, <literal>FOR SHARE</> and
+    <literal>FOR KEY SHARE</> variants,
+    as well as the <literal>NOWAIT</> option,
     do not appear in the standard.
    </para>
   </refsect2>
index 0706e3afc2df5e7cf1e2eca73dd9602d084f9795..e39b9770cbf519cd65630f0332e92c004d4d4f7d 100644 (file)
@@ -542,7 +542,7 @@ heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
                        result = TransactionIdGetDatum(HeapTupleHeaderGetXmin(tup->t_data));
                        break;
                case MaxTransactionIdAttributeNumber:
-                       result = TransactionIdGetDatum(HeapTupleHeaderGetXmax(tup->t_data));
+                       result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmax(tup->t_data));
                        break;
                case MinCommandIdAttributeNumber:
                case MaxCommandIdAttributeNumber:
diff --git a/src/backend/access/heap/README.tuplock b/src/backend/access/heap/README.tuplock
new file mode 100644 (file)
index 0000000..8d5cc16
--- /dev/null
@@ -0,0 +1,139 @@
+Locking tuples
+--------------
+
+Locking tuples is not as easy as locking tables or other database objects.
+The problem is that transactions might want to lock large numbers of tuples at
+any one time, so it's not possible to keep the locks objects in shared memory.
+To work around this limitation, we use a two-level mechanism.  The first level
+is implemented by storing locking information in the tuple header: a tuple is
+marked as locked by setting the current transaction's XID as its XMAX, and
+setting additional infomask bits to distinguish this case from the more normal
+case of having deleted the tuple.  When multiple transactions concurrently
+lock a tuple, a MultiXact is used; see below.  This mechanism can accomodate
+arbitrarily large numbers of tuples being locked simultaneously.
+
+When it is necessary to wait for a tuple-level lock to be released, the basic
+delay is provided by XactLockTableWait or MultiXactIdWait on the contents of
+the tuple's XMAX.  However, that mechanism will release all waiters
+concurrently, so there would be a race condition as to which waiter gets the
+tuple, potentially leading to indefinite starvation of some waiters.  The
+possibility of share-locking makes the problem much worse --- a steady stream
+of share-lockers can easily block an exclusive locker forever.  To provide
+more reliable semantics about who gets a tuple-level lock first, we use the
+standard lock manager, which implements the second level mentioned above.  The
+protocol for waiting for a tuple-level lock is really
+
+     LockTuple()
+     XactLockTableWait()
+     mark tuple as locked by me
+     UnlockTuple()
+
+When there are multiple waiters, arbitration of who is to get the lock next
+is provided by LockTuple().  However, at most one tuple-level lock will
+be held or awaited per backend at any time, so we don't risk overflow
+of the lock table.  Note that incoming share-lockers are required to
+do LockTuple as well, if there is any conflict, to ensure that they don't
+starve out waiting exclusive-lockers.  However, if there is not any active
+conflict for a tuple, we don't incur any extra overhead.
+
+We provide four levels of tuple locking strength: SELECT FOR KEY UPDATE is
+super-exclusive locking (used to delete tuples and more generally to update
+tuples modifying the values of the columns that make up the key of the tuple);
+SELECT FOR UPDATE is a standards-compliant exclusive lock; SELECT FOR SHARE
+implements shared locks; and finally SELECT FOR KEY SHARE is a super-weak mode
+that does not conflict with exclusive mode, but conflicts with SELECT FOR KEY
+UPDATE.  This last mode implements a mode just strong enough to implement RI
+checks, i.e. it ensures that tuples do not go away from under a check, without
+blocking when some other transaction that want to update the tuple without
+changing its key.
+
+The conflict table is:
+
+                KEY UPDATE        UPDATE        SHARE        KEY SHARE
+KEY UPDATE       conflict        conflict      conflict      conflict
+UPDATE           conflict        conflict      conflict
+SHARE            conflict        conflict
+KEY SHARE        conflict
+
+When there is a single locker in a tuple, we can just store the locking info
+in the tuple itself.  We do this by storing the locker's Xid in XMAX, and
+setting infomask bits specifying the locking strength.  There is one exception
+here: since infomask space is limited, we do not provide a separate bit
+for SELECT FOR SHARE, so we have to use the extended info in a MultiXact in
+that case.  (The other cases, SELECT FOR UPDATE and SELECT FOR KEY SHARE, are
+presumably more commonly used due to being the standards-mandated locking
+mechanism, or heavily used by the RI code, so we want to provide fast paths
+for those.)
+
+MultiXacts
+----------
+
+A tuple header provides very limited space for storing information about tuple
+locking and updates: there is room only for a single Xid and a small number of
+infomask bits.  Whenever we need to store more than one lock, we replace the
+first locker's Xid with a new MultiXactId.  Each MultiXact provides extended
+locking data; it comprises an array of Xids plus some flags bits for each one.
+The flags are currently used to store the locking strength of each member
+transaction.  (The flags also distinguish a pure locker from an updater.)
+
+In earlier PostgreSQL releases, a MultiXact always meant that the tuple was
+locked in shared mode by multiple transactions.  This is no longer the case; a
+MultiXact may contain an update or delete Xid.  (Keep in mind that tuple locks
+in a transaction do not conflict with other tuple locks in the same
+transaction, so it's possible to have otherwise conflicting locks in a
+MultiXact if they belong to the same transaction).
+
+Note that each lock is attributed to the subtransaction that acquires it.
+This means that a subtransaction that aborts is seen as though it releases the
+locks it acquired; concurrent transactions can then proceed without having to
+wait for the main transaction to finish.  It also means that a subtransaction
+can upgrade to a stronger lock level than an earlier transaction had, and if
+the subxact aborts, the earlier, weaker lock is kept.
+
+The possibility of having an update within a MultiXact means that they must
+persist across crashes and restarts: a future reader of the tuple needs to
+figure out whether the update committed or aborted.  So we have a requirement
+that pg_multixact needs to retain pages of its data until we're certain that
+the MultiXacts in them are no longer of interest.
+
+VACUUM is in charge of removing old MultiXacts at the time of tuple freezing.
+This works in the same way that pg_clog segments are removed: we have a
+pg_class column that stores the earliest multixact that could possibly be
+stored in the table; the minimum of all such values is stored in a pg_database
+column.  VACUUM computes the minimum across all pg_database values, and
+removes pg_multixact segments older than the minimum.
+
+Infomask Bits
+-------------
+
+The following infomask bits are applicable:
+
+- HEAP_XMAX_INVALID
+  Any tuple with this bit set does not have a valid value stored in XMAX.
+
+- HEAP_XMAX_IS_MULTI
+  This bit is set if the tuple's Xmax is a MultiXactId (as opposed to a
+  regular TransactionId).
+
+- HEAP_XMAX_LOCK_ONLY
+  This bit is set when the XMAX is a locker only; that is, if it's a
+  multixact, it does not contain an update among its members.  It's set when
+  the XMAX is a plain Xid that locked the tuple, as well.
+
+- HEAP_XMAX_KEYSHR_LOCK
+- HEAP_XMAX_EXCL_LOCK
+  These bits indicate the strength of the lock acquired; they are useful when
+  the XMAX is not a MultiXactId.  If it's a multi, the info is to be found in
+  the member flags.  If HEAP_XMAX_IS_MULTI is not set and HEAP_XMAX_LOCK_ONLY
+  is set, then one of these *must* be set as well.
+  Note there is no infomask bit for a SELECT FOR SHARE lock.  Also there is no
+  separate bit for a SELECT FOR KEY UPDATE lock; this is implemented by the
+  HEAP_KEYS_UPDATED bit.
+
+- HEAP_KEYS_UPDATED
+  This bit lives in t_infomask2.  If set, indicates that the XMAX updated
+  this tuple and changed the key values, or it deleted the tuple.
+  It's set regardless of whether the XMAX is a TransactionId or a MultiXactId.
+
+We currently never set the HEAP_XMAX_COMMITTED when the HEAP_XMAX_IS_MULTI bit
+is set.
index b19d1cf6c5746f3d64fe0bc24ef5222afdf154ff..57d47e8601443d592982e6faffb7855391790b95 100644 (file)
@@ -84,12 +84,105 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                                        TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
-                               ItemPointerData from, Buffer newbuf, HeapTuple newtup,
-                               bool all_visible_cleared, bool new_all_visible_cleared);
-static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
-                                          HeapTuple oldtup, HeapTuple newtup);
+                               Buffer newbuf, HeapTuple oldtup,
+                               HeapTuple newtup, bool all_visible_cleared,
+                               bool new_all_visible_cleared);
+static void HeapSatisfiesHOTandKeyUpdate(Relation relation,
+                                                        Bitmapset *hot_attrs, Bitmapset *key_attrs,
+                                                        bool *satisfies_hot, bool *satisfies_key,
+                                                        HeapTuple oldtup, HeapTuple newtup);
+static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
+                                                 uint16 old_infomask2, TransactionId add_to_xmax,
+                                                 LockTupleMode mode, bool is_update,
+                                                 TransactionId *result_xmax, uint16 *result_infomask,
+                                                 uint16 *result_infomask2);
+static HTSU_Result heap_lock_updated_tuple(Relation rel, HeapTuple tuple,
+                                               ItemPointer ctid, TransactionId xid,
+                                               LockTupleMode mode);
+static void GetMultiXactIdHintBits(MultiXactId multi, uint16 *new_infomask,
+                                          uint16 *new_infomask2);
+static TransactionId MultiXactIdGetUpdateXid(TransactionId xmax,
+                                               uint16 t_infomask);
+static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
+                               int *remaining, uint16 infomask);
+static bool ConditionalMultiXactIdWait(MultiXactId multi,
+                                                  MultiXactStatus status, int *remaining,
+                                                  uint16 infomask);
 
 
+/*
+ * Each tuple lock mode has a corresponding heavyweight lock, and one or two
+ * corresponding MultiXactStatuses (one to merely lock tuples, another one to
+ * update them).  This table (and the macros below) helps us determine the
+ * heavyweight lock mode and MultiXactStatus values to use for any particular
+ * tuple lock strength.
+ */
+static const struct
+{
+       LOCKMODE        hwlock;
+       MultiXactStatus lockstatus;
+       MultiXactStatus updstatus;
+}
+tupleLockExtraInfo[MaxLockTupleMode + 1] =
+{
+       {       /* LockTupleKeyShare */
+               AccessShareLock,
+               MultiXactStatusForKeyShare,
+               -1      /* KeyShare does not allow updating tuples */
+       },
+       {       /* LockTupleShare */
+               RowShareLock,
+               MultiXactStatusForShare,
+               -1      /* Share does not allow updating tuples */
+       },
+       {       /* LockTupleNoKeyExclusive */
+               ExclusiveLock,
+               MultiXactStatusForNoKeyUpdate,
+               MultiXactStatusNoKeyUpdate
+       },
+       {       /* LockTupleExclusive */
+               AccessExclusiveLock,
+               MultiXactStatusForUpdate,
+               MultiXactStatusUpdate
+       }
+};
+/* Get the LOCKMODE for a given MultiXactStatus */
+#define LOCKMODE_from_mxstatus(status) \
+                       (tupleLockExtraInfo[TUPLOCK_from_mxstatus((status))].hwlock)
+
+/*
+ * Acquire heavyweight locks on tuples, using a LockTupleMode strength value.
+ * This is more readable than having every caller translate it to lock.h's
+ * LOCKMODE.
+ */
+#define LockTupleTuplock(rel, tup, mode) \
+       LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
+#define UnlockTupleTuplock(rel, tup, mode) \
+       UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
+#define ConditionalLockTupleTuplock(rel, tup, mode) \
+       ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
+
+/*
+ * This table maps tuple lock strength values for each particular
+ * MultiXactStatus value.
+ */
+static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
+{
+       LockTupleKeyShare,              /* ForKeyShare */
+       LockTupleShare,                 /* ForShare */
+       LockTupleNoKeyExclusive,                /* ForNoKeyUpdate */
+       LockTupleExclusive,             /* ForUpdate */
+       LockTupleNoKeyExclusive,                /* NoKeyUpdate */
+       LockTupleExclusive              /* Update */
+};
+
+/* Get the LockTupleMode for a given MultiXactStatus */
+#define TUPLOCK_from_mxstatus(status) \
+                       (MultiXactStatusLock[(status)])
+/* Get the is_update bit for a given MultiXactStatus */
+#define ISUPDATE_from_mxstatus(status) \
+                       ((status) > MultiXactStatusForUpdate)
+
 /* ----------------------------------------------------------------
  *                                              heap support routines
  * ----------------------------------------------------------------
@@ -1664,7 +1757,7 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
                                   ItemPointerGetBlockNumber(tid));
                        offnum = ItemPointerGetOffsetNumber(&heapTuple->t_data->t_ctid);
                        at_chain_start = false;
-                       prev_xmax = HeapTupleHeaderGetXmax(heapTuple->t_data);
+                       prev_xmax = HeapTupleHeaderGetUpdateXid(heapTuple->t_data);
                }
                else
                        break;                          /* end of chain */
@@ -1787,7 +1880,7 @@ heap_get_latest_tid(Relation relation,
                 * tuple.  Check for XMIN match.
                 */
                if (TransactionIdIsValid(priorXmax) &&
-                 !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(tp.t_data)))
+                       !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(tp.t_data)))
                {
                        UnlockReleaseBuffer(buffer);
                        break;
@@ -1805,7 +1898,8 @@ heap_get_latest_tid(Relation relation,
                /*
                 * If there's a valid t_ctid link, follow it, else we're done.
                 */
-               if ((tp.t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED)) ||
+               if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
+                       HeapTupleHeaderIsOnlyLocked(tp.t_data) ||
                        ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
                {
                        UnlockReleaseBuffer(buffer);
@@ -1813,7 +1907,7 @@ heap_get_latest_tid(Relation relation,
                }
 
                ctid = tp.t_data->t_ctid;
-               priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+               priorXmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
                UnlockReleaseBuffer(buffer);
        }                                                       /* end of loop */
 }
@@ -1826,17 +1920,25 @@ heap_get_latest_tid(Relation relation,
  * If the transaction aborted, we guarantee the XMAX_INVALID hint bit will
  * be set on exit.     If the transaction committed, we set the XMAX_COMMITTED
  * hint bit if possible --- but beware that that may not yet be possible,
- * if the transaction committed asynchronously.  Hence callers should look
- * only at XMAX_INVALID.
+ * if the transaction committed asynchronously.
+ *
+ * Note that if the transaction was a locker only, we set HEAP_XMAX_INVALID
+ * even if it commits.
+ *
+ * Hence callers should look only at XMAX_INVALID.
+ *
+ * Note this is not allowed for tuples whose xmax is a multixact.
  */
 static void
 UpdateXmaxHintBits(HeapTupleHeader tuple, Buffer buffer, TransactionId xid)
 {
-       Assert(TransactionIdEquals(HeapTupleHeaderGetXmax(tuple), xid));
+       Assert(TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple), xid));
+       Assert(!(tuple->t_infomask & HEAP_XMAX_IS_MULTI));
 
        if (!(tuple->t_infomask & (HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID)))
        {
-               if (TransactionIdDidCommit(xid))
+               if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask) &&
+                       TransactionIdDidCommit(xid))
                        HeapTupleSetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
                                                                 xid);
                else
@@ -2373,6 +2475,26 @@ simple_heap_insert(Relation relation, HeapTuple tup)
        return heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
 }
 
+/*
+ * Given infomask/infomask2, compute the bits that must be saved in the
+ * "infobits" field of xl_heap_delete, xl_heap_update, xl_heap_lock,
+ * xl_heap_lock_updated WAL records.
+ *
+ * See fix_infomask_from_infobits.
+ */
+static uint8
+compute_infobits(uint16 infomask, uint16 infomask2)
+{
+       return
+               ((infomask & HEAP_XMAX_IS_MULTI) != 0 ? XLHL_XMAX_IS_MULTI : 0) |
+               ((infomask & HEAP_XMAX_LOCK_ONLY) != 0 ? XLHL_XMAX_LOCK_ONLY : 0) |
+               ((infomask & HEAP_XMAX_EXCL_LOCK) != 0 ? XLHL_XMAX_EXCL_LOCK : 0) |
+               /* note we ignore HEAP_XMAX_SHR_LOCK here */
+               ((infomask & HEAP_XMAX_KEYSHR_LOCK) != 0 ? XLHL_XMAX_KEYSHR_LOCK : 0) |
+               ((infomask2 & HEAP_KEYS_UPDATED) != 0 ?
+                XLHL_KEYS_UPDATED : 0);
+}
+
 /*
  *     heap_delete - delete a tuple
  *
@@ -2393,7 +2515,8 @@ simple_heap_insert(Relation relation, HeapTuple tup)
  * (the last only possible if wait == false).
  *
  * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
- * t_xmax, and t_cmax (the last only for HeapTupleSelfUpdated, since we
+ * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
+ * (the last only for HeapTupleSelfUpdated, since we
  * cannot obtain cmax from a combocid generated by another transaction).
  * See comments for struct HeapUpdateFailureData for additional info.
  */
@@ -2410,6 +2533,9 @@ heap_delete(Relation relation, ItemPointer tid,
        BlockNumber block;
        Buffer          buffer;
        Buffer          vmbuffer = InvalidBuffer;
+       TransactionId new_xmax;
+       uint16          new_infomask,
+                               new_infomask2;
        bool            have_tuple_lock = false;
        bool            iscombo;
        bool            all_visible_cleared = false;
@@ -2465,7 +2591,7 @@ l1:
                uint16          infomask;
 
                /* must copy state data before unlocking buffer */
-               xwait = HeapTupleHeaderGetXmax(tp.t_data);
+               xwait = HeapTupleHeaderGetRawXmax(tp.t_data);
                infomask = tp.t_data->t_infomask;
 
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -2481,20 +2607,20 @@ l1:
                 */
                if (!have_tuple_lock)
                {
-                       LockTuple(relation, &(tp.t_self), ExclusiveLock);
+                       LockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
                        have_tuple_lock = true;
                }
 
                /*
                 * Sleep until concurrent transaction ends.  Note that we don't care
-                * if the locker has an exclusive or shared lock, because we need
-                * exclusive.
+                * which lock mode the locker has, because we need the strongest one.
                 */
 
                if (infomask & HEAP_XMAX_IS_MULTI)
                {
                        /* wait for multixact */
-                       MultiXactIdWait((MultiXactId) xwait);
+                       MultiXactIdWait((MultiXactId) xwait, MultiXactStatusUpdate,
+                                                       NULL, infomask);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
                        /*
@@ -2503,7 +2629,7 @@ l1:
                         * change, and start over if so.
                         */
                        if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+                               !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tp.t_data),
                                                                         xwait))
                                goto l1;
 
@@ -2529,7 +2655,7 @@ l1:
                         * Check for xmax change, and start over if so.
                         */
                        if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+                               !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tp.t_data),
                                                                         xwait))
                                goto l1;
 
@@ -2541,8 +2667,9 @@ l1:
                 * We may overwrite if previous xmax aborted, or if it committed but
                 * only locked the tuple without updating it.
                 */
-               if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
-                                                                        HEAP_IS_LOCKED))
+               if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
+                       HEAP_XMAX_IS_LOCKED_ONLY(tp.t_data->t_infomask) ||
+                       HeapTupleHeaderIsOnlyLocked(tp.t_data))
                        result = HeapTupleMayBeUpdated;
                else
                        result = HeapTupleUpdated;
@@ -2562,14 +2689,14 @@ l1:
                           result == HeapTupleBeingUpdated);
                Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
                hufd->ctid = tp.t_data->t_ctid;
-               hufd->xmax = HeapTupleHeaderGetXmax(tp.t_data);
+               hufd->xmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
                if (result == HeapTupleSelfUpdated)
                        hufd->cmax = HeapTupleHeaderGetCmax(tp.t_data);
                else
                        hufd->cmax = 0;         /* for lack of an InvalidCommandId value */
                UnlockReleaseBuffer(buffer);
                if (have_tuple_lock)
-                       UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
+                       UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
                if (vmbuffer != InvalidBuffer)
                        ReleaseBuffer(vmbuffer);
                return result;
@@ -2603,14 +2730,29 @@ l1:
                                                        vmbuffer);
        }
 
+       /*
+        * If this is the first possibly-multixact-able operation in the
+        * current transaction, set my per-backend OldestMemberMXactId setting.
+        * We can be certain that the transaction will never become a member of
+        * any older MultiXactIds than that.  (We have to do this even if we
+        * end up just using our own TransactionId below, since some other
+        * backend could incorporate our XID into a MultiXact immediately
+        * afterwards.)
+        */
+       MultiXactIdSetOldestMember();
+
+       compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(tp.t_data),
+                                                         tp.t_data->t_infomask, tp.t_data->t_infomask2,
+                                                         xid, LockTupleExclusive, true,
+                                                         &new_xmax, &new_infomask, &new_infomask2);
+
        /* store transaction information of xact deleting the tuple */
-       tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
-                                                          HEAP_XMAX_INVALID |
-                                                          HEAP_XMAX_IS_MULTI |
-                                                          HEAP_IS_LOCKED |
-                                                          HEAP_MOVED);
+       tp.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+       tp.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+       tp.t_data->t_infomask |= new_infomask;
+       tp.t_data->t_infomask2 |= new_infomask2;
        HeapTupleHeaderClearHotUpdated(tp.t_data);
-       HeapTupleHeaderSetXmax(tp.t_data, xid);
+       HeapTupleHeaderSetXmax(tp.t_data, new_xmax);
        HeapTupleHeaderSetCmax(tp.t_data, cid, iscombo);
        /* Make sure there is no forward chain link in t_ctid */
        tp.t_data->t_ctid = tp.t_self;
@@ -2625,8 +2767,11 @@ l1:
                XLogRecData rdata[2];
 
                xlrec.all_visible_cleared = all_visible_cleared;
+               xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
+                                                                                         tp.t_data->t_infomask2);
                xlrec.target.node = relation->rd_node;
                xlrec.target.tid = tp.t_self;
+               xlrec.xmax = new_xmax;
                rdata[0].data = (char *) &xlrec;
                rdata[0].len = SizeOfHeapDelete;
                rdata[0].buffer = InvalidBuffer;
@@ -2679,7 +2824,7 @@ l1:
         * Release the lmgr tuple lock, if we had it.
         */
        if (have_tuple_lock)
-               UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
+               UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
 
        pgstat_count_heap_delete(relation);
 
@@ -2739,6 +2884,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  *     crosscheck - if not InvalidSnapshot, also check old tuple against this
  *     wait - true if should wait for any conflicting update to commit/abort
  *     hufd - output parameter, filled in failure cases (see below)
+ *     lockmode - output parameter, filled with lock mode acquired on tuple
  *
  * Normal, successful return value is HeapTupleMayBeUpdated, which
  * actually means we *did* update it.  Failure return codes are
@@ -2752,23 +2898,26 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  * data are not reflected into *newtup.
  *
  * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
- * t_xmax, and t_cmax (the last only for HeapTupleSelfUpdated, since we
+ * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
+ * (the last only for HeapTupleSelfUpdated, since we
  * cannot obtain cmax from a combocid generated by another transaction).
  * See comments for struct HeapUpdateFailureData for additional info.
  */
 HTSU_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                        CommandId cid, Snapshot crosscheck, bool wait,
-                       HeapUpdateFailureData *hufd)
+                       HeapUpdateFailureData *hufd, LockTupleMode *lockmode)
 {
        HTSU_Result result;
        TransactionId xid = GetCurrentTransactionId();
        Bitmapset  *hot_attrs;
+       Bitmapset  *key_attrs;
        ItemId          lp;
        HeapTupleData oldtup;
        HeapTuple       heaptup;
        Page            page;
        BlockNumber block;
+       MultiXactStatus mxact_status;
        Buffer          buffer,
                                newbuf,
                                vmbuffer = InvalidBuffer,
@@ -2779,9 +2928,20 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                                pagefree;
        bool            have_tuple_lock = false;
        bool            iscombo;
+       bool            satisfies_hot;
+       bool            satisfies_key;
        bool            use_hot_update = false;
+       bool            key_intact;
        bool            all_visible_cleared = false;
        bool            all_visible_cleared_new = false;
+       bool            checked_lockers;
+       bool            locker_remains;
+       TransactionId xmax_new_tuple,
+                                 xmax_old_tuple;
+       uint16          infomask_old_tuple,
+                               infomask2_old_tuple,
+                               infomask_new_tuple,
+                               infomask2_new_tuple;
 
        Assert(ItemPointerIsValid(otid));
 
@@ -2797,7 +2957,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
         * Note that we get a copy here, so we need not worry about relcache flush
         * happening midway through.
         */
-       hot_attrs = RelationGetIndexAttrBitmap(relation);
+       hot_attrs = RelationGetIndexAttrBitmap(relation, false);
+       key_attrs = RelationGetIndexAttrBitmap(relation, true);
 
        block = ItemPointerGetBlockNumber(otid);
        buffer = ReadBuffer(relation, block);
@@ -2821,6 +2982,44 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
        oldtup.t_len = ItemIdGetLength(lp);
        oldtup.t_self = *otid;
 
+       /*
+        * If we're not updating any "key" column, we can grab a weaker lock type.
+        * This allows for more concurrency when we are running simultaneously with
+        * foreign key checks.
+        *
+        * Note that if a column gets detoasted while executing the update, but the
+        * value ends up being the same, this test will fail and we will use the
+        * stronger lock.  This is acceptable; the important case to optimize is
+        * updates that don't manipulate key columns, not those that
+        * serendipitiously arrive at the same key values.
+        */
+       HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs,
+                                                                &satisfies_hot, &satisfies_key,
+                                                                &oldtup, newtup);
+       if (satisfies_key)
+       {
+               *lockmode = LockTupleNoKeyExclusive;
+               mxact_status = MultiXactStatusNoKeyUpdate;
+               key_intact = true;
+
+               /*
+                * If this is the first possibly-multixact-able operation in the
+                * current transaction, set my per-backend OldestMemberMXactId setting.
+                * We can be certain that the transaction will never become a member of
+                * any older MultiXactIds than that.  (We have to do this even if we
+                * end up just using our own TransactionId below, since some other
+                * backend could incorporate our XID into a MultiXact immediately
+                * afterwards.)
+                */
+               MultiXactIdSetOldestMember();
+       }
+       else
+       {
+               *lockmode = LockTupleExclusive;
+               mxact_status = MultiXactStatusUpdate;
+               key_intact = false;
+       }
+
        /*
         * Note: beyond this point, use oldtup not otid to refer to old tuple.
         * otid may very well point at newtup->t_self, which we will overwrite
@@ -2829,8 +3028,13 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
         */
 
 l2:
+       checked_lockers = false;
+       locker_remains = false;
        result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid, buffer);
 
+       /* see below about the "no wait" case */
+       Assert(result != HeapTupleBeingUpdated || wait);
+
        if (result == HeapTupleInvisible)
        {
                UnlockReleaseBuffer(buffer);
@@ -2838,11 +3042,26 @@ l2:
        }
        else if (result == HeapTupleBeingUpdated && wait)
        {
-               TransactionId xwait;
+               TransactionId   xwait;
                uint16          infomask;
+               bool            can_continue = false;
+
+               checked_lockers = true;
+
+               /*
+                * XXX note that we don't consider the "no wait" case here.  This
+                * isn't a problem currently because no caller uses that case, but it
+                * should be fixed if such a caller is introduced.  It wasn't a problem
+                * previously because this code would always wait, but now that some
+                * tuple locks do not conflict with one of the lock modes we use, it is
+                * possible that this case is interesting to handle specially.
+                *
+                * This may cause failures with third-party code that calls heap_update
+                * directly.
+                */
 
                /* must copy state data before unlocking buffer */
-               xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
+               xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
                infomask = oldtup.t_data->t_infomask;
 
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -2858,20 +3077,29 @@ l2:
                 */
                if (!have_tuple_lock)
                {
-                       LockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+                       LockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
                        have_tuple_lock = true;
                }
 
                /*
-                * Sleep until concurrent transaction ends.  Note that we don't care
-                * if the locker has an exclusive or shared lock, because we need
-                * exclusive.
+                * Now we have to do something about the existing locker.  If it's a
+                * multi, sleep on it; we might be awakened before it is completely
+                * gone (or even not sleep at all in some cases); we need to preserve
+                * it as locker, unless it is gone completely.
+                *
+                * If it's not a multi, we need to check for sleeping conditions before
+                * actually going to sleep.  If the update doesn't conflict with the
+                * locks, we just continue without sleeping (but making sure it is
+                * preserved).
                 */
-
                if (infomask & HEAP_XMAX_IS_MULTI)
                {
+                       TransactionId   update_xact;
+                       int                             remain;
+
                        /* wait for multixact */
-                       MultiXactIdWait((MultiXactId) xwait);
+                       MultiXactIdWait((MultiXactId) xwait, mxact_status, &remain,
+                                                       infomask);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
                        /*
@@ -2880,49 +3108,87 @@ l2:
                         * change, and start over if so.
                         */
                        if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
+                               !TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
                                                                         xwait))
                                goto l2;
 
                        /*
-                        * You might think the multixact is necessarily done here, but not
-                        * so: it could have surviving members, namely our own xact or
-                        * other subxacts of this backend.      It is legal for us to update
-                        * the tuple in either case, however (the latter case is
-                        * essentially a situation of upgrading our former shared lock to
-                        * exclusive).  We don't bother changing the on-disk hint bits
-                        * since we are about to overwrite the xmax altogether.
+                        * Note that the multixact may not be done by now.  It could have
+                        * surviving members; our own xact or other subxacts of this
+                        * backend, and also any other concurrent transaction that locked
+                        * the tuple with KeyShare if we only got TupleLockUpdate.  If this
+                        * is the case, we have to be careful to mark the updated tuple
+                        * with the surviving members in Xmax.
+                        *
+                        * Note that there could have been another update in the MultiXact.
+                        * In that case, we need to check whether it committed or aborted.
+                        * If it aborted we are safe to update it again; otherwise there is
+                        * an update conflict, and we have to return HeapTupleUpdated
+                        * below.
+                        *
+                        * In the LockTupleExclusive case, we still need to preserve the
+                        * surviving members: those would include the tuple locks we had
+                        * before this one, which are important to keep in case this
+                        * subxact aborts.
                         */
+                       update_xact = InvalidTransactionId;
+                       if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask))
+                               update_xact = HeapTupleGetUpdateXid(oldtup.t_data);
+
+                       /* there was no UPDATE in the MultiXact; or it aborted. */
+                       if (!TransactionIdIsValid(update_xact) ||
+                               TransactionIdDidAbort(update_xact))
+                               can_continue = true;
+
+                       locker_remains = remain != 0;
                }
                else
                {
-                       /* wait for regular transaction to end */
-                       XactLockTableWait(xwait);
-                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
                        /*
-                        * xwait is done, but if xwait had just locked the tuple then some
-                        * other xact could update this tuple before we get to this point.
-                        * Check for xmax change, and start over if so.
+                        * If it's just a key-share locker, and we're not changing the
+                        * key columns, we don't need to wait for it to end; but we
+                        * need to preserve it as locker.
                         */
-                       if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
-                                                                        xwait))
-                               goto l2;
+                       if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask) && key_intact)
+                       {
+                               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
-                       /* Otherwise check if it committed or aborted */
-                       UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
+                               /*
+                                * recheck the locker; if someone else changed the tuple while we
+                                * weren't looking, start over.
+                                */
+                               if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                       !TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
+                                                                                xwait))
+                                       goto l2;
+
+                               can_continue = true;
+                               locker_remains = true;
+                       }
+                       else
+                       {
+                               /* wait for regular transaction to end */
+                               XactLockTableWait(xwait);
+                               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+                               /*
+                                * xwait is done, but if xwait had just locked the tuple then some
+                                * other xact could update this tuple before we get to this point.
+                                * Check for xmax change, and start over if so.
+                                */
+                               if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                       !TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
+                                                                                xwait))
+                                       goto l2;
+
+                               /* Otherwise check if it committed or aborted */
+                               UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
+                               if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
+                                       can_continue = true;
+                       }
                }
 
-               /*
-                * We may overwrite if previous xmax aborted, or if it committed but
-                * only locked the tuple without updating it.
-                */
-               if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID |
-                                                                                HEAP_IS_LOCKED))
-                       result = HeapTupleMayBeUpdated;
-               else
-                       result = HeapTupleUpdated;
+               result = can_continue ? HeapTupleMayBeUpdated : HeapTupleUpdated;
        }
 
        if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
@@ -2939,17 +3205,18 @@ l2:
                           result == HeapTupleBeingUpdated);
                Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
                hufd->ctid = oldtup.t_data->t_ctid;
-               hufd->xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
+               hufd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
                if (result == HeapTupleSelfUpdated)
                        hufd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
                else
                        hufd->cmax = 0;         /* for lack of an InvalidCommandId value */
                UnlockReleaseBuffer(buffer);
                if (have_tuple_lock)
-                       UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+                       UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
                if (vmbuffer != InvalidBuffer)
                        ReleaseBuffer(vmbuffer);
                bms_free(hot_attrs);
+               bms_free(key_attrs);
                return result;
        }
 
@@ -2958,7 +3225,7 @@ l2:
         * visible while we were busy locking the buffer, or during some
         * subsequent window during which we had it unlocked, we'll have to unlock
         * and re-lock, to avoid holding the buffer lock across an I/O.  That's a
-        * bit unfortunate, esepecially since we'll now have to recheck whether
+        * bit unfortunate, especially since we'll now have to recheck whether
         * the tuple has been locked or updated under us, but hopefully it won't
         * happen very often.
         */
@@ -2991,12 +3258,54 @@ l2:
                Assert(!(newtup->t_data->t_infomask & HEAP_HASOID));
        }
 
+       /*
+        * If the tuple we're updating is locked, we need to preserve the locking
+        * info in the old tuple's Xmax.  Prepare a new Xmax value for this.
+        */
+       compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
+                                                         oldtup.t_data->t_infomask,
+                                                         oldtup.t_data->t_infomask2,
+                                                         xid, *lockmode, true,
+                                                         &xmax_old_tuple, &infomask_old_tuple,
+                                                         &infomask2_old_tuple);
+
+       /* And also prepare an Xmax value for the new copy of the tuple */
+       if ((oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
+               (checked_lockers && !locker_remains))
+               xmax_new_tuple = InvalidTransactionId;
+       else
+               xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+
+       if (!TransactionIdIsValid(xmax_new_tuple))
+       {
+               infomask_new_tuple = HEAP_XMAX_INVALID;
+               infomask2_new_tuple = 0;
+       }
+       else
+       {
+               if (oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI)
+               {
+                       GetMultiXactIdHintBits(xmax_new_tuple, &infomask_new_tuple,
+                                                                  &infomask2_new_tuple);
+               }
+               else
+               {
+                       infomask_new_tuple = HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_LOCK_ONLY;
+                       infomask2_new_tuple = 0;
+               }
+       }
+
+       /*
+        * Prepare the new tuple with the appropriate initial values of Xmin and
+        * Xmax, as well as initial infomask bits as computed above.
+        */
        newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
        newtup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
-       newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
        HeapTupleHeaderSetXmin(newtup->t_data, xid);
        HeapTupleHeaderSetCmin(newtup->t_data, cid);
-       HeapTupleHeaderSetXmax(newtup->t_data, 0);      /* for cleanliness */
+       newtup->t_data->t_infomask |= HEAP_UPDATED | infomask_new_tuple;
+       newtup->t_data->t_infomask2 |= infomask2_new_tuple;
+       HeapTupleHeaderSetXmax(newtup->t_data, xmax_new_tuple);
        newtup->t_tableOid = RelationGetRelid(relation);
 
        /*
@@ -3035,14 +3344,14 @@ l2:
        if (need_toast || newtupsize > pagefree)
        {
                /* Clear obsolete visibility flags ... */
-               oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
-                                                                          HEAP_XMAX_INVALID |
-                                                                          HEAP_XMAX_IS_MULTI |
-                                                                          HEAP_IS_LOCKED |
-                                                                          HEAP_MOVED);
+               oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
                HeapTupleClearHotUpdated(&oldtup);
                /* ... and store info about transaction updating this tuple */
-               HeapTupleHeaderSetXmax(oldtup.t_data, xid);
+               Assert(TransactionIdIsValid(xmax_old_tuple));
+               HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);
+               oldtup.t_data->t_infomask |= infomask_old_tuple;
+               oldtup.t_data->t_infomask2 |= infomask2_old_tuple;
                HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
                /* temporarily make it look not-updated */
                oldtup.t_data->t_ctid = oldtup.t_self;
@@ -3145,7 +3454,7 @@ l2:
                 * to do a HOT update.  Check if any of the index columns have been
                 * changed.  If not, then HOT update is possible.
                 */
-               if (HeapSatisfiesHOTUpdate(relation, hot_attrs, &oldtup, heaptup))
+               if (satisfies_hot)
                        use_hot_update = true;
        }
        else
@@ -3193,13 +3502,13 @@ l2:
        if (!already_marked)
        {
                /* Clear obsolete visibility flags ... */
-               oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
-                                                                          HEAP_XMAX_INVALID |
-                                                                          HEAP_XMAX_IS_MULTI |
-                                                                          HEAP_IS_LOCKED |
-                                                                          HEAP_MOVED);
+               oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+               oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
                /* ... and store info about transaction updating this tuple */
-               HeapTupleHeaderSetXmax(oldtup.t_data, xid);
+               Assert(TransactionIdIsValid(xmax_old_tuple));
+               HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);
+               oldtup.t_data->t_infomask |= infomask_old_tuple;
+               oldtup.t_data->t_infomask2 |= infomask2_old_tuple;
                HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
        }
 
@@ -3229,8 +3538,8 @@ l2:
        /* XLOG stuff */
        if (RelationNeedsWAL(relation))
        {
-               XLogRecPtr      recptr = log_heap_update(relation, buffer, oldtup.t_self,
-                                                                                        newbuf, heaptup,
+               XLogRecPtr      recptr = log_heap_update(relation, buffer,
+                                                                                        newbuf, &oldtup, heaptup,
                                                                                         all_visible_cleared,
                                                                                         all_visible_cleared_new);
 
@@ -3272,7 +3581,7 @@ l2:
         * Release the lmgr tuple lock, if we had it.
         */
        if (have_tuple_lock)
-               UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+               UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
 
        pgstat_count_heap_update(relation, use_hot_update);
 
@@ -3287,13 +3596,14 @@ l2:
        }
 
        bms_free(hot_attrs);
+       bms_free(key_attrs);
 
        return HeapTupleMayBeUpdated;
 }
 
 /*
  * Check if the specified attribute's value is same in both given tuples.
- * Subroutine for HeapSatisfiesHOTUpdate.
+ * Subroutine for HeapSatisfiesHOTandKeyUpdate.
  */
 static bool
 heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
@@ -3327,7 +3637,7 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
 
        /*
         * Extract the corresponding values.  XXX this is pretty inefficient if
-        * there are many indexed columns.      Should HeapSatisfiesHOTUpdate do a
+        * there are many indexed columns.      Should HeapSatisfiesHOTandKeyUpdate do a
         * single heap_deform_tuple call on each tuple, instead?  But that doesn't
         * work for system columns ...
         */
@@ -3370,35 +3680,101 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
 }
 
 /*
- * Check if the old and new tuples represent a HOT-safe update. To be able
- * to do a HOT update, we must not have changed any columns used in index
- * definitions.
+ * Check which columns are being updated.
+ *
+ * This simultaneously checks conditions for HOT updates and for FOR KEY
+ * SHARE updates.  Since much of the time they will be checking very similar
+ * sets of columns, and doing the same tests on them, it makes sense to
+ * optimize and do them together.
  *
- * The set of attributes to be checked is passed in (we dare not try to
- * compute it while holding exclusive buffer lock...)  NOTE that hot_attrs
- * is destructively modified!  That is OK since this is invoked at most once
- * by heap_update().
+ * We receive two bitmapsets comprising the two sets of columns we're
+ * interested in.  Note these are destructively modified; that is OK since
+ * this is invoked at most once in heap_update.
  *
- * Returns true if safe to do HOT update.
+ * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not
+ * modified indexed columns); key_result is set to TRUE if the update does not
+ * modify columns used in the key.
  */
-static bool
-HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
-                                          HeapTuple oldtup, HeapTuple newtup)
+static void
+HeapSatisfiesHOTandKeyUpdate(Relation relation,
+                                                        Bitmapset *hot_attrs, Bitmapset *key_attrs,
+                                                        bool *satisfies_hot, bool *satisfies_key,
+                                                        HeapTuple oldtup, HeapTuple newtup)
 {
-       int                     attrnum;
+       int             next_hot_attnum;
+       int             next_key_attnum;
+       bool    hot_result = true;
+       bool    key_result = true;
+       bool    key_done = false;
+       bool    hot_done = false;
+
+       next_hot_attnum = bms_first_member(hot_attrs);
+       if (next_hot_attnum == -1)
+               hot_done = true;
+       else
+               /* Adjust for system attributes */
+               next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
 
-       while ((attrnum = bms_first_member(hot_attrs)) >= 0)
-       {
+       next_key_attnum = bms_first_member(key_attrs);
+       if (next_key_attnum == -1)
+               key_done = true;
+       else
                /* Adjust for system attributes */
-               attrnum += FirstLowInvalidHeapAttributeNumber;
+               next_key_attnum += FirstLowInvalidHeapAttributeNumber;
 
-               /* If the attribute value has changed, we can't do HOT update */
-               if (!heap_tuple_attr_equals(RelationGetDescr(relation), attrnum,
-                                                                       oldtup, newtup))
-                       return false;
+       for (;;)
+       {
+               int             check_now;
+               bool    changed;
+
+               /* both bitmapsets are now empty */
+               if (key_done && hot_done)
+                       break;
+
+               /* XXX there's probably an easier way ... */
+               if (hot_done)
+                       check_now = next_key_attnum;
+               if (key_done)
+                       check_now = next_hot_attnum;
+               else
+                       check_now = Min(next_hot_attnum, next_key_attnum);
+
+               changed = !heap_tuple_attr_equals(RelationGetDescr(relation),
+                                                                                 check_now, oldtup, newtup);
+               if (changed)
+               {
+                       if (check_now == next_hot_attnum)
+                               hot_result = false;
+                       if (check_now == next_key_attnum)
+                               key_result = false;
+               }
+
+               /* if both are false now, we can stop checking */
+               if (!hot_result && !key_result)
+                       break;
+
+               if (check_now == next_hot_attnum)
+               {
+                       next_hot_attnum = bms_first_member(hot_attrs);
+                       if (next_hot_attnum == -1)
+                               hot_done = true;
+                       else
+                               /* Adjust for system attributes */
+                               next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
+               }
+               if (check_now == next_key_attnum)
+               {
+                       next_key_attnum = bms_first_member(key_attrs);
+                       if (next_key_attnum == -1)
+                               key_done = true;
+                       else
+                               /* Adjust for system attributes */
+                               next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+               }
        }
 
-       return true;
+       *satisfies_hot = hot_result;
+       *satisfies_key = key_result;
 }
 
 /*
@@ -3414,11 +3790,12 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 {
        HTSU_Result result;
        HeapUpdateFailureData hufd;
+       LockTupleMode lockmode;
 
        result = heap_update(relation, otid, tup,
                                                 GetCurrentCommandId(true), InvalidSnapshot,
                                                 true /* wait for commit */,
-                                                &hufd);
+                                                &hufd, &lockmode);
        switch (result)
        {
                case HeapTupleSelfUpdated:
@@ -3440,6 +3817,28 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
        }
 }
 
+
+/*
+ * Return the MultiXactStatus corresponding to the given tuple lock mode.
+ */
+static MultiXactStatus
+get_mxact_status_for_lock(LockTupleMode mode, bool is_update)
+{
+       MultiXactStatus         retval;
+
+       if (is_update)
+               retval = tupleLockExtraInfo[mode].updstatus;
+       else
+               retval = tupleLockExtraInfo[mode].lockstatus;
+
+       if (retval == -1)
+               elog(ERROR, "invalid lock tuple mode %d/%s", mode,
+                        is_update ? "true" : "false");
+
+       return retval;
+}
+
+
 /*
  *     heap_lock_tuple - lock a tuple in shared or exclusive mode
  *
@@ -3452,6 +3851,8 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  *             tuple's cmax if lock is successful)
  *     mode: indicates if shared or exclusive tuple lock is desired
  *     nowait: if true, ereport rather than blocking if lock not available
+ *     follow_updates: if true, follow the update chain to also lock descendant
+ *             tuples.
  *
  * Output parameters:
  *     *tuple: all fields filled in
@@ -3464,61 +3865,30 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  *     HeapTupleUpdated: lock failed because tuple updated by other xact
  *
  * In the failure cases, the routine fills *hufd with the tuple's t_ctid,
- * t_xmax, and t_cmax (the last only for HeapTupleSelfUpdated, since we
+ * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax
+ * (the last only for HeapTupleSelfUpdated, since we
  * cannot obtain cmax from a combocid generated by another transaction).
  * See comments for struct HeapUpdateFailureData for additional info.
  *
- *
- * NOTES: because the shared-memory lock table is of finite size, but users
- * could reasonably want to lock large numbers of tuples, we do not rely on
- * the standard lock manager to store tuple-level locks over the long term.
- * Instead, a tuple is marked as locked by setting the current transaction's
- * XID as its XMAX, and setting additional infomask bits to distinguish this
- * usage from the more normal case of having deleted the tuple.  When
- * multiple transactions concurrently share-lock a tuple, the first locker's
- * XID is replaced in XMAX with a MultiTransactionId representing the set of
- * XIDs currently holding share-locks.
- *
- * When it is necessary to wait for a tuple-level lock to be released, the
- * basic delay is provided by XactLockTableWait or MultiXactIdWait on the
- * contents of the tuple's XMAX.  However, that mechanism will release all
- * waiters concurrently, so there would be a race condition as to which
- * waiter gets the tuple, potentially leading to indefinite starvation of
- * some waiters.  The possibility of share-locking makes the problem much
- * worse --- a steady stream of share-lockers can easily block an exclusive
- * locker forever.     To provide more reliable semantics about who gets a
- * tuple-level lock first, we use the standard lock manager.  The protocol
- * for waiting for a tuple-level lock is really
- *             LockTuple()
- *             XactLockTableWait()
- *             mark tuple as locked by me
- *             UnlockTuple()
- * When there are multiple waiters, arbitration of who is to get the lock next
- * is provided by LockTuple(). However, at most one tuple-level lock will
- * be held or awaited per backend at any time, so we don't risk overflow
- * of the lock table.  Note that incoming share-lockers are required to
- * do LockTuple as well, if there is any conflict, to ensure that they don't
- * starve out waiting exclusive-lockers.  However, if there is not any active
- * conflict for a tuple, we don't incur any extra overhead.
+ * See README.tuplock for a thorough explanation of this mechanism.
  */
 HTSU_Result
 heap_lock_tuple(Relation relation, HeapTuple tuple,
                                CommandId cid, LockTupleMode mode, bool nowait,
+                               bool follow_updates,
                                Buffer *buffer, HeapUpdateFailureData *hufd)
 {
        HTSU_Result result;
        ItemPointer tid = &(tuple->t_self);
        ItemId          lp;
        Page            page;
-       TransactionId xid;
-       TransactionId xmax;
-       uint16          old_infomask;
-       uint16          new_infomask;
-       LOCKMODE        tuple_lock_type;
+       TransactionId xid,
+                               xmax;
+       uint16          old_infomask,
+                               new_infomask,
+                               new_infomask2;
        bool            have_tuple_lock = false;
 
-       tuple_lock_type = (mode == LockTupleShared) ? ShareLock : ExclusiveLock;
-
        *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
        LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
 
@@ -3542,30 +3912,58 @@ l3:
        {
                TransactionId xwait;
                uint16          infomask;
+               uint16          infomask2;
+               bool            require_sleep;
+               ItemPointerData t_ctid;
 
                /* must copy state data before unlocking buffer */
-               xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+               xwait = HeapTupleHeaderGetRawXmax(tuple->t_data);
                infomask = tuple->t_data->t_infomask;
+               infomask2 = tuple->t_data->t_infomask2;
+               ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid);
 
                LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
 
                /*
-                * If we wish to acquire share lock, and the tuple is already
-                * share-locked by a multixact that includes any subtransaction of the
-                * current top transaction, then we effectively hold the desired lock
-                * already.  We *must* succeed without trying to take the tuple lock,
-                * else we will deadlock against anyone waiting to acquire exclusive
-                * lock.  We don't need to make any state changes in this case.
+                * If any subtransaction of the current top transaction already holds a
+                * lock as strong or stronger than what we're requesting, we
+                * effectively hold the desired lock already.  We *must* succeed
+                * without trying to take the tuple lock, else we will deadlock against
+                * anyone wanting to acquire a stronger lock.
                 */
-               if (mode == LockTupleShared &&
-                       (infomask & HEAP_XMAX_IS_MULTI) &&
-                       MultiXactIdIsCurrent((MultiXactId) xwait))
+               if (infomask & HEAP_XMAX_IS_MULTI)
                {
-                       Assert(infomask & HEAP_XMAX_SHARED_LOCK);
-                       /* Probably can't hold tuple lock here, but may as well check */
-                       if (have_tuple_lock)
-                               UnlockTuple(relation, tid, tuple_lock_type);
-                       return HeapTupleMayBeUpdated;
+                       int             i;
+                       int             nmembers;
+                       MultiXactMember *members;
+
+                       /*
+                        * We don't need to allow old multixacts here; if that had been the
+                        * case, HeapTupleSatisfiesUpdate would have returned MayBeUpdated
+                        * and we wouldn't be here.
+                        */
+                       nmembers = GetMultiXactIdMembers(xwait, &members, false);
+
+                       for (i = 0; i < nmembers; i++)
+                       {
+                               if (TransactionIdIsCurrentTransactionId(members[i].xid))
+                               {
+                                       LockTupleMode   membermode;
+
+                                       membermode = TUPLOCK_from_mxstatus(members[i].status);
+
+                                       if (membermode >= mode)
+                                       {
+                                               if (have_tuple_lock)
+                                                       UnlockTupleTuplock(relation, tid, mode);
+
+                                               pfree(members);
+                                               return HeapTupleMayBeUpdated;
+                                       }
+                               }
+                       }
+
+                       pfree(members);
                }
 
                /*
@@ -3581,255 +3979,435 @@ l3:
                {
                        if (nowait)
                        {
-                               if (!ConditionalLockTuple(relation, tid, tuple_lock_type))
+                               if (!ConditionalLockTupleTuplock(relation, tid, mode))
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                       errmsg("could not obtain lock on row in relation \"%s\"",
-                                                  RelationGetRelationName(relation))));
+                                                        errmsg("could not obtain lock on row in relation \"%s\"",
+                                                                       RelationGetRelationName(relation))));
                        }
                        else
-                               LockTuple(relation, tid, tuple_lock_type);
+                               LockTupleTuplock(relation, tid, mode);
                        have_tuple_lock = true;
                }
 
-               if (mode == LockTupleShared && (infomask & HEAP_XMAX_SHARED_LOCK))
+               /*
+                * Initially assume that we will have to wait for the locking
+                * transaction(s) to finish.  We check various cases below in which
+                * this can be turned off.
+                */
+               require_sleep = true;
+               if (mode == LockTupleKeyShare)
                {
                        /*
-                        * Acquiring sharelock when there's at least one sharelocker
-                        * already.  We need not wait for him/them to complete.
-                        */
-                       LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
-
-                       /*
-                        * Make sure it's still a shared lock, else start over.  (It's OK
-                        * if the ownership of the shared lock has changed, though.)
+                        * If we're requesting KeyShare, and there's no update present, we
+                        * don't need to wait.  Even if there is an update, we can still
+                        * continue if the key hasn't been modified.
+                        *
+                        * However, if there are updates, we need to walk the update chain
+                        * to mark future versions of the row as locked, too.  That way, if
+                        * somebody deletes that future version, we're protected against
+                        * the key going away.  This locking of future versions could block
+                        * momentarily, if a concurrent transaction is deleting a key; or
+                        * it could return a value to the effect that the transaction
+                        * deleting the key has already committed.  So we do this before
+                        * re-locking the buffer; otherwise this would be prone to
+                        * deadlocks.
+                        *
+                        * Note that the TID we're locking was grabbed before we unlocked
+                        * the buffer.  For it to change while we're not looking, the other
+                        * properties we're testing for below after re-locking the buffer
+                        * would also change, in which case we would restart this loop
+                        * above.
                         */
-                       if (!(tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
-                               goto l3;
-               }
-               else if (infomask & HEAP_XMAX_IS_MULTI)
-               {
-                       /* wait for multixact to end */
-                       if (nowait)
+                       if (!(infomask2 & HEAP_KEYS_UPDATED))
                        {
-                               if (!ConditionalMultiXactIdWait((MultiXactId) xwait))
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                       errmsg("could not obtain lock on row in relation \"%s\"",
-                                                  RelationGetRelationName(relation))));
-                       }
-                       else
-                               MultiXactIdWait((MultiXactId) xwait);
+                               bool    updated;
 
-                       LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                               updated = !HEAP_XMAX_IS_LOCKED_ONLY(infomask);
 
-                       /*
-                        * If xwait had just locked the tuple then some other xact could
-                        * update this tuple before we get to this point. Check for xmax
-                        * change, and start over if so.
-                        */
-                       if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
-                                                                        xwait))
-                               goto l3;
+                               /*
+                                * If there are updates, follow the update chain; bail out
+                                * if that cannot be done.
+                                */
+                               if (follow_updates && updated)
+                               {
+                                       HTSU_Result             res;
+
+                                       res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
+                                                                                                 GetCurrentTransactionId(),
+                                                                                                 mode);
+                                       if (res != HeapTupleMayBeUpdated)
+                                       {
+                                               result = res;
+                                               /* recovery code expects to have buffer lock held */
+                                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                                               goto failed;
+                                       }
+                               }
 
-                       /*
-                        * You might think the multixact is necessarily done here, but not
-                        * so: it could have surviving members, namely our own xact or
-                        * other subxacts of this backend.      It is legal for us to lock the
-                        * tuple in either case, however.  We don't bother changing the
-                        * on-disk hint bits since we are about to overwrite the xmax
-                        * altogether.
-                        */
+                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+                               /*
+                                * Make sure it's still an appropriate lock, else start over.
+                                * Also, if it wasn't updated before we released the lock, but
+                                * is updated now, we start over too; the reason is that we now
+                                * need to follow the update chain to lock the new versions.
+                                */
+                               if (!HeapTupleHeaderIsOnlyLocked(tuple->t_data) &&
+                                       ((tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED) ||
+                                        !updated))
+                                       goto l3;
+
+                               /* Things look okay, so we can skip sleeping */
+                               require_sleep = false;
+
+                               /*
+                                * Note we allow Xmax to change here; other updaters/lockers
+                                * could have modified it before we grabbed the buffer lock.
+                                * However, this is not a problem, because with the recheck we
+                                * just did we ensure that they still don't conflict with the
+                                * lock we want.
+                                */
+                       }
                }
-               else
+               else if (mode == LockTupleShare)
                {
-                       /* wait for regular transaction to end */
-                       if (nowait)
+                       /*
+                        * If we're requesting Share, we can similarly avoid sleeping if
+                        * there's no update and no exclusive lock present.
+                        */
+                       if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) &&
+                               !HEAP_XMAX_IS_EXCL_LOCKED(infomask))
                        {
-                               if (!ConditionalXactLockTableWait(xwait))
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
-                                       errmsg("could not obtain lock on row in relation \"%s\"",
-                                                  RelationGetRelationName(relation))));
-                       }
-                       else
-                               XactLockTableWait(xwait);
-
-                       LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
 
+                               /*
+                                * Make sure it's still an appropriate lock, else start over.
+                                * See above about allowing xmax to change.
+                                */
+                               if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) ||
+                                       HEAP_XMAX_IS_EXCL_LOCKED(tuple->t_data->t_infomask))
+                                       goto l3;
+                               require_sleep = false;
+                       }
+               }
+               else if (mode == LockTupleNoKeyExclusive)
+               {
                        /*
-                        * xwait is done, but if xwait had just locked the tuple then some
-                        * other xact could update this tuple before we get to this point.
-                        * Check for xmax change, and start over if so.
+                        * If we're requesting NoKeyExclusive, we might also be able to
+                        * avoid sleeping; just ensure that there's no other lock type than
+                        * KeyShare.  Note that this is a bit more involved than just
+                        * checking hint bits -- we need to expand the multixact to figure
+                        * out lock modes for each one (unless there was only one such
+                        * locker).
                         */
-                       if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
-                               !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
-                                                                        xwait))
-                               goto l3;
+                       if (infomask & HEAP_XMAX_IS_MULTI)
+                       {
+                               int             nmembers;
+                               MultiXactMember *members;
 
-                       /* Otherwise check if it committed or aborted */
-                       UpdateXmaxHintBits(tuple->t_data, *buffer, xwait);
+                               /*
+                                * We don't need to allow old multixacts here; if that had been
+                                * the case, HeapTupleSatisfiesUpdate would have returned
+                                * MayBeUpdated and we wouldn't be here.
+                                */
+                               nmembers = GetMultiXactIdMembers(xwait, &members, false);
+
+                               if (nmembers <= 0)
+                               {
+                                       /*
+                                        * No need to keep the previous xmax here. This is unlikely
+                                        * to happen.
+                                        */
+                                       require_sleep = false;
+                               }
+                               else
+                               {
+                                       int             i;
+                                       bool    allowed = true;
+
+                                       for (i = 0; i < nmembers; i++)
+                                       {
+                                               if (members[i].status != MultiXactStatusForKeyShare)
+                                               {
+                                                       allowed = false;
+                                                       break;
+                                               }
+                                       }
+                                       if (allowed)
+                                       {
+                                               /*
+                                                * if the xmax changed under us in the meantime, start
+                                                * over.
+                                                */
+                                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                                               if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                                       !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
+                                                                                                xwait))
+                                               {
+                                                       pfree(members);
+                                                       goto l3;
+                                               }
+                                               /* otherwise, we're good */
+                                               require_sleep = false;
+                                       }
+
+                                       pfree(members);
+                               }
+                       }
+                       else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
+                       {
+                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+                               /* if the xmax changed in the meantime, start over */
+                               if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                       !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
+                                                                                xwait))
+                                       goto l3;
+                               /* otherwise, we're good */
+                               require_sleep = false;
+                       }
                }
 
                /*
-                * We may lock if previous xmax aborted, or if it committed but only
-                * locked the tuple without updating it.  The case where we didn't
-                * wait because we are joining an existing shared lock is correctly
-                * handled, too.
+                * By here, we either have already acquired the buffer exclusive lock,
+                * or we must wait for the locking transaction or multixact; so below
+                * we ensure that we grab buffer lock after the sleep.
                 */
-               if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
-                                                                                HEAP_IS_LOCKED))
-                       result = HeapTupleMayBeUpdated;
-               else
-                       result = HeapTupleUpdated;
-       }
 
-       if (result != HeapTupleMayBeUpdated)
-       {
-               Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
-               Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
-               hufd->ctid = tuple->t_data->t_ctid;
-               hufd->xmax = HeapTupleHeaderGetXmax(tuple->t_data);
-               if (result == HeapTupleSelfUpdated)
-                       hufd->cmax = HeapTupleHeaderGetCmax(tuple->t_data);
-               else
-                       hufd->cmax = 0;         /* for lack of an InvalidCommandId value */
-               LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-               if (have_tuple_lock)
-                       UnlockTuple(relation, tid, tuple_lock_type);
-               return result;
-       }
+               if (require_sleep)
+               {
+                       if (infomask & HEAP_XMAX_IS_MULTI)
+                       {
+                               MultiXactStatus status = get_mxact_status_for_lock(mode, false);
 
-       /*
-        * We might already hold the desired lock (or stronger), possibly under a
-        * different subtransaction of the current top transaction.  If so, there
-        * is no need to change state or issue a WAL record.  We already handled
-        * the case where this is true for xmax being a MultiXactId, so now check
-        * for cases where it is a plain TransactionId.
-        *
-        * Note in particular that this covers the case where we already hold
-        * exclusive lock on the tuple and the caller only wants shared lock. It
-        * would certainly not do to give up the exclusive lock.
-        */
-       xmax = HeapTupleHeaderGetXmax(tuple->t_data);
-       old_infomask = tuple->t_data->t_infomask;
-
-       if (!(old_infomask & (HEAP_XMAX_INVALID |
-                                                 HEAP_XMAX_COMMITTED |
-                                                 HEAP_XMAX_IS_MULTI)) &&
-               (mode == LockTupleShared ?
-                (old_infomask & HEAP_IS_LOCKED) :
-                (old_infomask & HEAP_XMAX_EXCL_LOCK)) &&
-               TransactionIdIsCurrentTransactionId(xmax))
-       {
-               LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-               /* Probably can't hold tuple lock here, but may as well check */
-               if (have_tuple_lock)
-                       UnlockTuple(relation, tid, tuple_lock_type);
-               return HeapTupleMayBeUpdated;
-       }
+                               /* We only ever lock tuples, never update them */
+                               if (status >= MultiXactStatusNoKeyUpdate)
+                                       elog(ERROR, "invalid lock mode in heap_lock_tuple");
 
-       /*
-        * Compute the new xmax and infomask to store into the tuple.  Note we do
-        * not modify the tuple just yet, because that would leave it in the wrong
-        * state if multixact.c elogs.
-        */
-       xid = GetCurrentTransactionId();
-
-       new_infomask = old_infomask & ~(HEAP_XMAX_COMMITTED |
-                                                                       HEAP_XMAX_INVALID |
-                                                                       HEAP_XMAX_IS_MULTI |
-                                                                       HEAP_IS_LOCKED |
-                                                                       HEAP_MOVED);
+                               /* wait for multixact to end */
+                               if (nowait)
+                               {
+                                       if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
+                                                                                                       status, NULL, infomask))
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                                errmsg("could not obtain lock on row in relation \"%s\"",
+                                                                               RelationGetRelationName(relation))));
+                               }
+                               else
+                                       MultiXactIdWait((MultiXactId) xwait, status, NULL, infomask);
 
-       if (mode == LockTupleShared)
-       {
-               /*
-                * If this is the first acquisition of a shared lock in the current
-                * transaction, set my per-backend OldestMemberMXactId setting. We can
-                * be certain that the transaction will never become a member of any
-                * older MultiXactIds than that.  (We have to do this even if we end
-                * up just using our own TransactionId below, since some other backend
-                * could incorporate our XID into a MultiXact immediately afterwards.)
-                */
-               MultiXactIdSetOldestMember();
+                               /* if there are updates, follow the update chain */
+                               if (follow_updates &&
+                                       !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
+                               {
+                                       HTSU_Result             res;
+
+                                       res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
+                                                                                                 GetCurrentTransactionId(),
+                                                                                                 mode);
+                                       if (res != HeapTupleMayBeUpdated)
+                                       {
+                                               result = res;
+                                               /* recovery code expects to have buffer lock held */
+                                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                                               goto failed;
+                                       }
+                               }
 
-               new_infomask |= HEAP_XMAX_SHARED_LOCK;
+                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
 
-               /*
-                * Check to see if we need a MultiXactId because there are multiple
-                * lockers.
-                *
-                * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID bit if
-                * the xmax was a MultiXactId but it was not running anymore. There is
-                * a race condition, which is that the MultiXactId may have finished
-                * since then, but that uncommon case is handled within
-                * MultiXactIdExpand.
-                *
-                * There is a similar race condition possible when the old xmax was a
-                * regular TransactionId.  We test TransactionIdIsInProgress again
-                * just to narrow the window, but it's still possible to end up
-                * creating an unnecessary MultiXactId.  Fortunately this is harmless.
-                */
-               if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED)))
-               {
-                       if (old_infomask & HEAP_XMAX_IS_MULTI)
-                       {
                                /*
-                                * If the XMAX is already a MultiXactId, then we need to
-                                * expand it to include our own TransactionId.
+                                * If xwait had just locked the tuple then some other xact
+                                * could update this tuple before we get to this point. Check
+                                * for xmax change, and start over if so.
                                 */
-                               xid = MultiXactIdExpand((MultiXactId) xmax, xid);
-                               new_infomask |= HEAP_XMAX_IS_MULTI;
-                       }
-                       else if (TransactionIdIsInProgress(xmax))
-                       {
+                               if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+                                       !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data),
+                                                                                xwait))
+                                       goto l3;
+
                                /*
-                                * If the XMAX is a valid TransactionId, then we need to
-                                * create a new MultiXactId that includes both the old locker
-                                * and our own TransactionId.
+                                * Of course, the multixact might not be done here: if we're
+                                * requesting a light lock mode, other transactions with light
+                                * locks could still be alive, as well as locks owned by our
+                                * own xact or other subxacts of this backend.  We need to
+                                * preserve the surviving MultiXact members.  Note that it
+                                * isn't absolutely necessary in the latter case, but doing so
+                                * is simpler.
                                 */
-                               xid = MultiXactIdCreate(xmax, xid);
-                               new_infomask |= HEAP_XMAX_IS_MULTI;
                        }
                        else
                        {
+                               /* wait for regular transaction to end */
+                               if (nowait)
+                               {
+                                       if (!ConditionalXactLockTableWait(xwait))
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+                                                                errmsg("could not obtain lock on row in relation \"%s\"",
+                                                                               RelationGetRelationName(relation))));
+                               }
+                               else
+                                       XactLockTableWait(xwait);
+
+                               /* if there are updates, follow the update chain */
+                               if (follow_updates &&
+                                       !HEAP_XMAX_IS_LOCKED_ONLY(infomask))
+                               {
+                                       HTSU_Result             res;
+
+                                       res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
+                                                                                                 GetCurrentTransactionId(),
+                                                                                                 mode);
+                                       if (res != HeapTupleMayBeUpdated)
+                                       {
+                                               result = res;
+                                               /* recovery code expects to have buffer lock held */
+                                               LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+                                               goto failed;
+                                       }
+                               }