--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 6;
+
+my ($node, $result);
+
+#
+# Test set-up
+#
+$node = get_new_node('CIC_2PC_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
+$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
+
+
+#
+# Run 3 overlapping 2PC transactions with CIC
+#
+# We have two concurrent background psql processes: $main_h for INSERTs and
+# $cic_h for CIC. Also, we use non-background psql for some COMMIT PREPARED
+# statements.
+#
+
+my $main_in = '';
+my $main_out = '';
+my $main_timer = IPC::Run::timeout(180);
+
+my $main_h =
+ $node->background_psql('postgres', \$main_in, \$main_out,
+ $main_timer, on_error_stop => 1);
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint1
+);
+pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
+
+my $cic_in = '';
+my $cic_out = '';
+my $cic_timer = IPC::Run::timeout(180);
+my $cic_h =
+ $node->background_psql('postgres', \$cic_in, \$cic_out,
+ $cic_timer, on_error_stop => 1);
+$cic_in .= q(
+\echo start
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
+
+$main_in .= q(
+PREPARE TRANSACTION 'a';
+);
+
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint2
+);
+pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'b';
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint3
+);
+pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'c';
+COMMIT PREPARED 'c';
+);
+$main_h->pump_nb;
+
+$main_h->finish;
+$cic_h->finish;
+
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after overlapping 2PC');
+
+
+#
+# Server restart shall not change whether prepared xact blocks CIC
+#
+
+$node->safe_psql(
+ 'postgres', q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+PREPARE TRANSACTION 'spans_restart';
+BEGIN;
+CREATE TABLE unused ();
+PREPARE TRANSACTION 'persists_forever';
+));
+$node->restart;
+
+my $reindex_in = '';
+my $reindex_out = '';
+my $reindex_timer = IPC::Run::timeout(180);
+my $reindex_h =
+ $node->background_psql('postgres', \$reindex_in, \$reindex_out,
+ $reindex_timer, on_error_stop => 1);
+$reindex_in .= q(
+\echo start
+DROP INDEX CONCURRENTLY idx;
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
+
+$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
+$reindex_h->finish;
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after 2PC and restart');
+
+
+#
+# Stress CIC+2PC with pgbench
+#
+
+# Fix broken index first
+$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
+
+# Run background pgbench with CIC. We cannot mix-in this script into single
+# pgbench: CIC will deadlock with itself occasionally.
+my $pgbench_out = '';
+my $pgbench_timer = IPC::Run::timeout(180);
+my $pgbench_h = $node->background_pgbench(
+ '--no-vacuum --client=1 --transactions=100',
+ {
+ '002_pgbench_concurrent_cic' => q(
+ DROP INDEX CONCURRENTLY idx;
+ CREATE INDEX CONCURRENTLY idx ON tbl(i);
+ SELECT bt_index_check('idx',true);
+ )
+ },
+ \$pgbench_out,
+ $pgbench_timer);
+
+# Run pgbench.
+$node->pgbench(
+ '--no-vacuum --client=5 --transactions=100',
+ 0,
+ [qr{actually processed}],
+ [qr{^$}],
+ 'concurrent INSERTs w/ 2PC',
+ {
+ '002_pgbench_concurrent_2pc' => q(
+ BEGIN;
+ INSERT INTO tbl VALUES(0);
+ PREPARE TRANSACTION 'c:client_id';
+ COMMIT PREPARED 'c:client_id';
+ ),
+ '002_pgbench_concurrent_2pc_savepoint' => q(
+ BEGIN;
+ SAVEPOINT s1;
+ INSERT INTO tbl VALUES(0);
+ PREPARE TRANSACTION 'c:client_id';
+ COMMIT PREPARED 'c:client_id';
+ )
+ });
+
+$pgbench_h->pump_nb;
+$pgbench_h->finish();
+$result =
+ ($Config{osname} eq "MSWin32")
+ ? ($pgbench_h->full_results)[0]
+ : $pgbench_h->result(0);
+is($result, 0, "pgbench with CIC works");
+
+# done
+$node->stop;
+done_testing();
proc->pgprocno = gxact->pgprocno;
SHMQueueElemInit(&(proc->links));
proc->waitStatus = STATUS_OK;
- /* We set up the gxact's VXID as InvalidBackendId/XID */
- proc->lxid = (LocalTransactionId) xid;
+ if (LocalTransactionIdIsValid(MyProc->lxid))
+ {
+ /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
+ proc->lxid = MyProc->lxid;
+ proc->backendId = MyBackendId;
+ }
+ else
+ {
+ Assert(AmStartupProcess() || !IsPostmasterEnvironment);
+ /* GetLockConflicts() uses this to specify a wait on the XID */
+ proc->lxid = xid;
+ proc->backendId = InvalidBackendId;
+ }
pgxact->xid = xid;
pgxact->xmin = InvalidTransactionId;
pgxact->delayChkpt = false;
pgxact->vacuumFlags = 0;
proc->pid = 0;
- proc->backendId = InvalidBackendId;
proc->databaseId = databaseid;
proc->roleId = owner;
proc->tempNamespaceId = InvalidOid;
return result;
}
+/*
+ * TwoPhaseGetXidByVirtualXID
+ * Lookup VXID among xacts prepared since last startup.
+ *
+ * (This won't find recovered xacts.) If more than one matches, return any
+ * and set "have_more" to true. To witness multiple matches, a single
+ * BackendId must consume 2^32 LXIDs, with no intervening database restart.
+ */
+TransactionId
+TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+ bool *have_more)
+{
+ int i;
+ TransactionId result = InvalidTransactionId;
+
+ Assert(VirtualTransactionIdIsValid(vxid));
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *proc;
+ VirtualTransactionId proc_vxid;
+
+ if (!gxact->valid)
+ continue;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ GET_VXID_FROM_PGPROC(proc_vxid, *proc);
+ if (VirtualTransactionIdEquals(vxid, proc_vxid))
+ {
+ /* Startup process sets proc->backendId to InvalidBackendId. */
+ Assert(!gxact->inredo);
+
+ if (result != InvalidTransactionId)
+ {
+ *have_more = true;
+ break;
+ }
+ result = gxact->xid;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return result;
+}
+
/*
* TwoPhaseGetDummyBackendId
* Get the dummy backend ID for prepared transaction specified by XID
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
+ /*
+ * Transfer our locks to a dummy PGPROC. This has to be done before
+ * ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
+ * conclude "xact already committed or aborted" for our locks.
+ */
+ PostPrepare_Locks(xid);
+
/*
* Let others know about no transaction in progress by me. This has to be
* done *after* the prepared transaction has been marked valid, else
PostPrepare_MultiXact(xid);
- PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
* To do this, obtain the current list of lockers, and wait on their VXIDs
* until they are finished.
*
- * Note we don't try to acquire the locks on the given locktags, only the VXIDs
- * of its lock holders; if somebody grabs a conflicting lock on the objects
- * after we obtained our initial list of lockers, we will not wait for them.
+ * Note we don't try to acquire the locks on the given locktags, only the
+ * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
+ * on the objects after we obtained our initial list of lockers, we will not
+ * wait for them.
*/
void
WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
* The result array is palloc'd and is terminated with an invalid VXID.
* *countp, if not null, is updated to the number of items set.
*
- * Of course, the result could be out of date by the time it's returned,
- * so use of this function has to be thought about carefully.
+ * Of course, the result could be out of date by the time it's returned, so
+ * use of this function has to be thought about carefully. Similarly, a
+ * PGPROC with no "lxid" will be considered non-conflicting regardless of any
+ * lock it holds. Existing callers don't care about a locker after that
+ * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after
+ * pg_xact updates and before releasing locks.
*
* Note we never include the current xact's vxid in the result array,
* since an xact never blocks itself.
}
}
+/*
+ * XactLockForVirtualXact
+ *
+ * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
+ * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those
+ * functions, it assumes "xid" is never a subtransaction and that "xid" is
+ * prepared, committed, or aborted.
+ *
+ * If !TransactionIdIsValid(xid), this locks every prepared XID having been
+ * known as "vxid" before its PREPARE TRANSACTION.
+ */
+static bool
+XactLockForVirtualXact(VirtualTransactionId vxid,
+ TransactionId xid, bool wait)
+{
+ bool more = false;
+
+ /* There is no point to wait for 2PCs if you have no 2PCs. */
+ if (max_prepared_xacts == 0)
+ return true;
+
+ do
+ {
+ LockAcquireResult lar;
+ LOCKTAG tag;
+
+ /* Clear state from previous iterations. */
+ if (more)
+ {
+ xid = InvalidTransactionId;
+ more = false;
+ }
+
+ /* If we have no xid, try to find one. */
+ if (!TransactionIdIsValid(xid))
+ xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
+ if (!TransactionIdIsValid(xid))
+ {
+ Assert(!more);
+ return true;
+ }
+
+ /* Check or wait for XID completion. */
+ SET_LOCKTAG_TRANSACTION(tag, xid);
+ lar = LockAcquire(&tag, ShareLock, false, !wait);
+ if (lar == LOCKACQUIRE_NOT_AVAIL)
+ return false;
+ LockRelease(&tag, ShareLock, false);
+ } while (more);
+
+ return true;
+}
+
/*
* VirtualXactLock
*
- * If wait = true, wait until the given VXID has been released, and then
- * return true.
+ * If wait = true, wait as long as the given VXID or any XID acquired by the
+ * same transaction is still running. Then, return true.
*
- * If wait = false, just check whether the VXID is still running, and return
- * true or false.
+ * If wait = false, just check whether that VXID or one of those XIDs is still
+ * running, and return true or false.
*/
bool
VirtualXactLock(VirtualTransactionId vxid, bool wait)
{
LOCKTAG tag;
PGPROC *proc;
+ TransactionId xid = InvalidTransactionId;
Assert(VirtualTransactionIdIsValid(vxid));
- if (VirtualTransactionIdIsPreparedXact(vxid))
- {
- LockAcquireResult lar;
-
- /*
- * Prepared transactions don't hold vxid locks. The
- * LocalTransactionId is always a normal, locked XID.
- */
- SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
- lar = LockAcquire(&tag, ShareLock, false, !wait);
- if (lar != LOCKACQUIRE_NOT_AVAIL)
- LockRelease(&tag, ShareLock, false);
- return lar != LOCKACQUIRE_NOT_AVAIL;
- }
+ if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
+ /* no vxid lock; localTransactionId is a normal, locked XID */
+ return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
*/
proc = BackendIdGetProc(vxid.backendId);
if (proc == NULL)
- return true;
+ return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
/*
* We must acquire this lock before checking the backendId and lxid
*/
LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
- /* If the transaction has ended, our work here is done. */
if (proc->backendId != vxid.backendId
|| proc->fpLocalTransactionId != vxid.localTransactionId)
{
+ /* VXID ended */
LWLockRelease(&proc->backendLock);
- return true;
+ return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
}
/*
proc->fpVXIDLock = false;
}
+ /*
+ * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
+ * search. The proc might have assigned this XID but not yet locked it,
+ * in which case the proc will lock this XID before releasing the VXID.
+ * The backendLock critical section excludes VirtualXactLockTableCleanup(),
+ * so we won't save an XID of a different VXID. It doesn't matter whether
+ * we save this before or after setting up the primary lock table entry.
+ */
+ xid = ProcGlobal->allPgXact[proc->pgprocno].xid;
+
/* Done with proc->fpLockBits */
LWLockRelease(&proc->backendLock);
(void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
- return true;
+ return XactLockForVirtualXact(vxid, xid, wait);
}
/*
* (XXX is it worth testing likewise for duplicate catcache flush entries?
* Probably not.)
*
+ * Many subsystems own higher-level caches that depend on relcache and/or
+ * catcache, and they register callbacks here to invalidate their caches.
+ * While building a higher-level cache entry, a backend may receive a
+ * callback for the being-built entry or one of its dependencies. This
+ * implies the new higher-level entry would be born stale, and it might
+ * remain stale for the life of the backend. Many caches do not prevent
+ * that. They rely on DDL for can't-miss catalog changes taking
+ * AccessExclusiveLock on suitable objects. (For a change made with less
+ * locking, backends might never read the change.) The relation cache,
+ * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
+ * than the beginning of the next transaction. Hence, when a relevant
+ * invalidation callback arrives during a build, relcache.c reattempts that
+ * build. Caches with similar needs could do likewise.
+ *
* If a relcache flush is issued for a system relation that we preload
* from the relcache init file, we must also delete the init file so that
* it will be rebuilt during the next backend restart. The actual work of
extern void AtAbort_Twophase(void);
extern void PostPrepare_Twophase(void);
+extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+ bool *have_more);
extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held);
/*
* Top-level transactions are identified by VirtualTransactionIDs comprising
- * PGPROC fields backendId and lxid. For prepared transactions, the
- * LocalTransactionId is an ordinary XID. These are guaranteed unique over
- * the short term, but will be reused after a database restart or XID
- * wraparound; hence they should never be stored on disk.
+ * PGPROC fields backendId and lxid. For recovered prepared transactions, the
+ * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
+ * refers to that kind. These are guaranteed unique over the short term, but
+ * will be reused after a database restart or XID wraparound; hence they
+ * should never be stored on disk.
*
* Note that struct VirtualTransactionId can not be assumed to be atomically
* assignable as a whole. However, type LocalTransactionId is assumed to
#define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
#define VirtualTransactionIdIsValid(vxid) \
(LocalTransactionIdIsValid((vxid).localTransactionId))
-#define VirtualTransactionIdIsPreparedXact(vxid) \
+#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
((vxid).backendId == InvalidBackendId)
#define VirtualTransactionIdEquals(vxid1, vxid2) \
((vxid1).backendId == (vxid2).backendId && \
}
}
+=pod
+
+=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
+
+Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, which the
+caller may use to send input to B<psql>. The process's stdin is sourced from
+the $stdin scalar reference, and its stdout and stderr go to the $stdout
+scalar reference. This allows the caller to act on other parts of the system
+while idling this backend.
+
+The specified timer object is attached to the harness, as well. It's caller's
+responsibility to select the timeout length, and to restart the timer after
+each command if the timeout is per-command.
+
+psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
+disabled. That may be overridden by passing extra psql parameters.
+
+Dies on failure to invoke psql, or if psql fails to connect. Errors occurring
+later are the caller's problem. psql runs with on_error_stop by default so
+that it will stop running sql and return 3 if passed SQL results in an error.
+
+Be sure to "finish" the harness when done with it.
+
+=over
+
+=item on_error_stop => 1
+
+By default, the B<psql> method invokes the B<psql> program with ON_ERROR_STOP=1
+set, so SQL execution is stopped at the first error and exit code 3 is
+returned. Set B<on_error_stop> to 0 to ignore errors instead.
+
+=item replication => B<value>
+
+If set, add B<replication=value> to the conninfo string.
+Passing the literal value C<database> results in a logical replication
+connection.
+
+=item extra_params => ['--single-transaction']
+
+If given, it must be an array reference containing additional parameters to B<psql>.
+
+=back
+
+=cut
+
+sub background_psql
+{
+ my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ my $replication = $params{replication};
+
+ my @psql_params = (
+ 'psql',
+ '-XAtq',
+ '-d',
+ $self->connstr($dbname)
+ . (defined $replication ? " replication=$replication" : ""),
+ '-f',
+ '-');
+
+ $params{on_error_stop} = 1 unless defined $params{on_error_stop};
+
+ push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop};
+ push @psql_params, @{ $params{extra_params} }
+ if defined $params{extra_params};
+
+ # Ensure there is no data waiting to be sent:
+ $$stdin = "" if ref($stdin);
+ # IPC::Run would otherwise append to existing contents:
+ $$stdout = "" if ref($stdout);
+
+ my $harness = IPC::Run::start \@psql_params,
+ '<', $stdin, '>', $stdout, $timer;
+
+ # Request some output, and pump until we see it. This means that psql
+ # connection failures are caught here, relieving callers of the need to
+ # handle those. (Right now, we have no particularly good handling for
+ # errors anyway, but that might be added later.)
+ my $banner = "background_psql: ready";
+ $$stdin = "\\echo $banner\n";
+ pump $harness until $$stdout =~ /$banner/ || $timer->is_expired;
+
+ die "psql startup timed out" if $timer->is_expired;
+
+ return $harness;
+}
+
# Common sub of pgbench-invoking interfaces. Makes any requested script files
# and returns pgbench command-line options causing use of those files.
sub _pgbench_make_files