diff options
author | Robert Haas | 2011-11-14 15:04:55 +0000 |
---|---|---|
committer | Robert Haas | 2011-12-02 11:35:30 +0000 |
commit | 0755cb5964c0b33d1894add7bb3a80dbc11a4fb6 (patch) | |
tree | a75523a57795a38b9e7f93036ecb45af82977107 | |
parent | f98db4f264ea655f713536db5eed89ebf9531191 (diff) |
Reimplement ProcArrayLock as a new type of FlexLock.flexlock
By providing some custom handling for ProcArrayEndTransaction, we can
avoid the need for ending transactions to repeatedly acquire the
spinlock. The amount of work that needs to be done while holding the
lock is so small that we can do it while holding the spinlock, or
(when the lock is contended) make the last person to release the lock
do it on behalf of the ending backend. This greatly improves
performance for unlogged tables at high client counts; permanent
tables also benefit, but performance is still severely throttled by
WALInsertLock contention.
-rw-r--r-- | src/backend/commands/analyze.c | 9 | ||||
-rw-r--r-- | src/backend/commands/vacuum.c | 5 | ||||
-rw-r--r-- | src/backend/storage/ipc/procarray.c | 170 | ||||
-rw-r--r-- | src/backend/storage/lmgr/Makefile | 2 | ||||
-rw-r--r-- | src/backend/storage/lmgr/flexlock.c | 27 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 7 | ||||
-rw-r--r-- | src/backend/storage/lmgr/procarraylock.c | 344 | ||||
-rw-r--r-- | src/include/storage/flexlock_internals.h | 1 | ||||
-rw-r--r-- | src/include/storage/procarraylock.h | 28 |
9 files changed, 474 insertions, 119 deletions
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index 314324618a..2e972ec280 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -40,6 +40,7 @@ #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/procarraylock.h" #include "utils/acl.h" #include "utils/attoptcache.h" #include "utils/datum.h" @@ -222,9 +223,9 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy) /* * OK, let's do it. First let other backends know I'm in ANALYZE. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); MyPgXact->vacuumFlags |= PROC_IN_ANALYZE; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* * Do the normal non-recursive ANALYZE. @@ -249,9 +250,9 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy) * Reset my PGPROC flag. Note: we need this here, and not in vacuum_rel, * because the vacuum flag is cleared by the end-of-xact code. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); MyPgXact->vacuumFlags &= ~PROC_IN_ANALYZE; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index e70dbedbd0..09aa32b95a 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -39,6 +39,7 @@ #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/procarraylock.h" #include "utils/acl.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -895,11 +896,11 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound) * MyProc->xid/xmin, else OldestXmin might appear to go backwards, * which is probably Not Good. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); MyPgXact->vacuumFlags |= PROC_IN_VACUUM; if (for_wraparound) MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 19ff524a60..d457e3f957 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -52,6 +52,7 @@ #include "access/twophase.h" #include "miscadmin.h" #include "storage/procarray.h" +#include "storage/procarraylock.h" #include "storage/spin.h" #include "utils/builtins.h" #include "utils/snapmgr.h" @@ -261,7 +262,7 @@ ProcArrayAdd(PGPROC *proc) ProcArrayStruct *arrayP = procArray; int index; - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); if (arrayP->numProcs >= arrayP->maxProcs) { @@ -270,7 +271,7 @@ ProcArrayAdd(PGPROC *proc) * fixed supply of PGPROC structs too, and so we should have failed * earlier.) */ - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); ereport(FATAL, (errcode(ERRCODE_TOO_MANY_CONNECTIONS), errmsg("sorry, too many clients already"))); @@ -300,7 +301,7 @@ ProcArrayAdd(PGPROC *proc) arrayP->pgprocnos[index] = proc->pgprocno; arrayP->numProcs++; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* @@ -325,7 +326,7 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) DisplayXidCache(); #endif - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); if (TransactionIdIsValid(latestXid)) { @@ -351,13 +352,13 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid) (arrayP->numProcs - index - 1) * sizeof (int)); arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */ arrayP->numProcs--; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return; } } /* Ooops */ - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); elog(LOG, "failed to find proc %p in ProcArray", proc); } @@ -383,54 +384,19 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid) if (TransactionIdIsValid(latestXid)) { - /* - * We must lock ProcArrayLock while clearing our advertised XID, so - * that we do not exit the set of "running" transactions while someone - * else is taking a snapshot. See discussion in - * src/backend/access/transam/README. - */ - Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid)); - - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - - pgxact->xid = InvalidTransactionId; - proc->lxid = InvalidLocalTransactionId; - pgxact->xmin = InvalidTransactionId; - /* must be cleared with xid/xmin: */ - pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; - pgxact->inCommit = false; /* be sure this is cleared in abort */ - proc->recoveryConflictPending = false; - - /* Clear the subtransaction-XID cache too while holding the lock */ - pgxact->nxids = 0; - pgxact->overflowed = false; - - /* Also advance global latestCompletedXid while holding the lock */ - if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, - latestXid)) - ShmemVariableCache->latestCompletedXid = latestXid; - - LWLockRelease(ProcArrayLock); + Assert(proc == MyProc); + ProcArrayLockClearTransaction(latestXid); } else { - /* - * If we have no XID, we don't need to lock, since we won't affect - * anyone else's calculation of a snapshot. We might change their - * estimate of global xmin, but that's OK. - */ - Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid)); - - proc->lxid = InvalidLocalTransactionId; pgxact->xmin = InvalidTransactionId; /* must be cleared with xid/xmin: */ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; - pgxact->inCommit = false; /* be sure this is cleared in abort */ - proc->recoveryConflictPending = false; - - Assert(pgxact->nxids == 0); - Assert(pgxact->overflowed == false); } + + proc->lxid = InvalidLocalTransactionId; + pgxact->inCommit = false; /* be sure this is cleared in abort */ + proc->recoveryConflictPending = false; } @@ -562,7 +528,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) /* * Nobody else is running yet, but take locks anyhow */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); /* * KnownAssignedXids is sorted so we cannot just add the xids, we have to @@ -669,7 +635,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid)); Assert(TransactionIdIsValid(ShmemVariableCache->nextXid)); - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); if (standbyState == STANDBY_SNAPSHOT_READY) @@ -724,7 +690,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid, /* * Uses same locking as transaction commit */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); /* * Remove subxids from known-assigned-xacts. @@ -737,7 +703,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid, if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid)) procArray->lastOverflowedXid = max_xid; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* @@ -829,7 +795,7 @@ TransactionIdIsInProgress(TransactionId xid) errmsg("out of memory"))); } - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); /* * Now that we have the lock, we can check latestCompletedXid; if the @@ -837,7 +803,7 @@ TransactionIdIsInProgress(TransactionId xid) */ if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid)) { - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); xc_by_latest_xid_inc(); return true; } @@ -865,7 +831,7 @@ TransactionIdIsInProgress(TransactionId xid) */ if (TransactionIdEquals(pxid, xid)) { - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); xc_by_main_xid_inc(); return true; } @@ -887,7 +853,7 @@ TransactionIdIsInProgress(TransactionId xid) if (TransactionIdEquals(cxid, xid)) { - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); xc_by_child_xid_inc(); return true; } @@ -915,7 +881,7 @@ TransactionIdIsInProgress(TransactionId xid) if (KnownAssignedXidExists(xid)) { - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); xc_by_known_assigned_inc(); return true; } @@ -931,7 +897,7 @@ TransactionIdIsInProgress(TransactionId xid) nxids = KnownAssignedXidsGet(xids, xid); } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* * If none of the relevant caches overflowed, we know the Xid is not @@ -997,7 +963,7 @@ TransactionIdIsActive(TransactionId xid) if (TransactionIdPrecedes(xid, RecentXmin)) return false; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (i = 0; i < arrayP->numProcs; i++) { @@ -1022,7 +988,7 @@ TransactionIdIsActive(TransactionId xid) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return result; } @@ -1085,7 +1051,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) /* Cannot look for individual databases during recovery */ Assert(allDbs || !RecoveryInProgress()); - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); /* * We initialize the MIN() calculation with latestCompletedXid + 1. This @@ -1140,7 +1106,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) */ TransactionId kaxmin = KnownAssignedXidsGetOldestXmin(); - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); if (TransactionIdIsNormal(kaxmin) && TransactionIdPrecedes(kaxmin, result)) @@ -1151,7 +1117,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum) /* * No other information needed, so release the lock immediately. */ - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* * Compute the cutoff XID by subtracting vacuum_defer_cleanup_age, @@ -1280,7 +1246,7 @@ GetSnapshotData(Snapshot snapshot) * It is sufficient to get shared lock on ProcArrayLock, even if we are * going to set MyProc->xmin. */ - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); /* xmax is always latestCompletedXid + 1 */ xmax = ShmemVariableCache->latestCompletedXid; @@ -1418,7 +1384,7 @@ GetSnapshotData(Snapshot snapshot) if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* * Update globalxmin to include actual process xids. This is a slightly @@ -1475,7 +1441,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) return false; /* Get lock so source xact can't end while we're doing this */ - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1521,7 +1487,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) break; } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return result; } @@ -1595,7 +1561,7 @@ GetRunningTransactionData(void) * Ensure that no xids enter or leave the procarray while we obtain * snapshot. */ - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); LWLockAcquire(XidGenLock, LW_SHARED); latestCompletedXid = ShmemVariableCache->latestCompletedXid; @@ -1658,7 +1624,7 @@ GetRunningTransactionData(void) CurrentRunningXacts->latestCompletedXid = latestCompletedXid; /* We don't release XidGenLock here, the caller is responsible for that */ - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid)); Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid)); @@ -1691,7 +1657,7 @@ GetOldestActiveTransactionId(void) Assert(!RecoveryInProgress()); - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); oldestRunningXid = ShmemVariableCache->nextXid; @@ -1720,7 +1686,7 @@ GetOldestActiveTransactionId(void) */ } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return oldestRunningXid; } @@ -1753,7 +1719,7 @@ GetTransactionsInCommit(TransactionId **xids_p) xids = (TransactionId *) palloc(arrayP->maxProcs * sizeof(TransactionId)); nxids = 0; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1768,7 +1734,7 @@ GetTransactionsInCommit(TransactionId **xids_p) xids[nxids++] = pxid; } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); *xids_p = xids; return nxids; @@ -1790,7 +1756,7 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids) ProcArrayStruct *arrayP = procArray; int index; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1818,7 +1784,7 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return result; } @@ -1840,7 +1806,7 @@ BackendPidGetProc(int pid) if (pid == 0) /* never match dummy PGPROCs */ return NULL; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1853,7 +1819,7 @@ BackendPidGetProc(int pid) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return result; } @@ -1881,7 +1847,7 @@ BackendXidGetPid(TransactionId xid) if (xid == InvalidTransactionId) /* never match invalid xid */ return 0; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1896,7 +1862,7 @@ BackendXidGetPid(TransactionId xid) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return result; } @@ -1951,7 +1917,7 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0, vxids = (VirtualTransactionId *) palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs); - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -1989,7 +1955,7 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0, } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); *nvxids = count; return vxids; @@ -2048,7 +2014,7 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid) errmsg("out of memory"))); } - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -2083,7 +2049,7 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* add the terminator */ vxids[count].backendId = InvalidBackendId; @@ -2104,7 +2070,7 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) int index; pid_t pid = 0; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -2131,7 +2097,7 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return pid; } @@ -2207,7 +2173,7 @@ CountDBBackends(Oid databaseid) int count = 0; int index; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -2221,7 +2187,7 @@ CountDBBackends(Oid databaseid) count++; } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return count; } @@ -2237,7 +2203,7 @@ CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending) pid_t pid = 0; /* tell all backends to die */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); for (index = 0; index < arrayP->numProcs; index++) { @@ -2263,7 +2229,7 @@ CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* @@ -2276,7 +2242,7 @@ CountUserBackends(Oid roleid) int count = 0; int index; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -2289,7 +2255,7 @@ CountUserBackends(Oid roleid) count++; } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); return count; } @@ -2337,7 +2303,7 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) *nbackends = *nprepared = 0; - LWLockAcquire(ProcArrayLock, LW_SHARED); + ProcArrayLockAcquire(PAL_SHARED); for (index = 0; index < arrayP->numProcs; index++) { @@ -2363,7 +2329,7 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared) } } - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); if (!found) return false; /* no conflicting backends, so done */ @@ -2416,7 +2382,7 @@ XidCacheRemoveRunningXids(TransactionId xid, * to abort subtransactions, but pending closer analysis we'd best be * conservative. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); /* * Under normal circumstances xid and xids[] will be in increasing order, @@ -2464,7 +2430,7 @@ XidCacheRemoveRunningXids(TransactionId xid, latestXid)) ShmemVariableCache->latestCompletedXid = latestXid; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } #ifdef XIDCACHE_DEBUG @@ -2631,7 +2597,7 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, /* * Uses same locking as transaction commit */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); KnownAssignedXidsRemoveTree(xid, nsubxids, subxids); @@ -2640,7 +2606,7 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, max_xid)) ShmemVariableCache->latestCompletedXid = max_xid; - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* @@ -2650,9 +2616,9 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, void ExpireAllKnownAssignedTransactionIds(void) { - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); KnownAssignedXidsRemovePreceding(InvalidTransactionId); - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } /* @@ -2662,9 +2628,9 @@ ExpireAllKnownAssignedTransactionIds(void) void ExpireOldKnownAssignedTransactionIds(TransactionId xid) { - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); KnownAssignedXidsRemovePreceding(xid); - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); } @@ -2886,7 +2852,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, { /* must hold lock to compress */ if (!exclusive_lock) - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); KnownAssignedXidsCompress(true); @@ -2894,7 +2860,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, /* note: we no longer care about the tail pointer */ if (!exclusive_lock) - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* * If it still won't fit then we're out of memory diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index 3730e51c7e..27eaa97020 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = flexlock.o lmgr.o lock.o proc.o deadlock.o lwlock.o spin.o s_lock.o \ - predicate.o + procarraylock.o predicate.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/flexlock.c b/src/backend/storage/lmgr/flexlock.c index 1bd3dc727e..614595100b 100644 --- a/src/backend/storage/lmgr/flexlock.c +++ b/src/backend/storage/lmgr/flexlock.c @@ -30,6 +30,7 @@ #include "storage/flexlock.h" #include "storage/flexlock_internals.h" #include "storage/predicate.h" +#include "storage/procarraylock.h" #include "storage/spin.h" /* @@ -176,9 +177,14 @@ CreateFlexLocks(void) FlexLockArray = (FlexLockPadded *) ptr; - /* All of the "fixed" FlexLocks are LWLocks. */ + /* All of the "fixed" FlexLocks are LWLocks - except ProcArrayLock. */ for (id = 0, lock = FlexLockArray; id < NumFixedFlexLocks; id++, lock++) - FlexLockInit(&lock->flex, FLEXLOCK_TYPE_LWLOCK); + { + if (id == ProcArrayLock) + FlexLockInit(&lock->flex, FLEXLOCK_TYPE_PROCARRAYLOCK); + else + FlexLockInit(&lock->flex, FLEXLOCK_TYPE_LWLOCK); + } /* * Initialize the dynamic-allocation counter, which is stored just before @@ -322,13 +328,20 @@ FlexLockReleaseAll(void) { while (num_held_flexlocks > 0) { + FlexLockId id; + FlexLock *flex; + HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ - /* - * FLEXTODO: When we have multiple types of flex locks, this will - * need to call the appropriate release function for each lock type. - */ - LWLockRelease(held_flexlocks[num_held_flexlocks - 1]); + id = held_flexlocks[num_held_flexlocks - 1]; + flex = &FlexLockArray[id].flex; + if (flex->locktype == FLEXLOCK_TYPE_LWLOCK) + LWLockRelease(id); + else + { + Assert(id == ProcArrayLock); + ProcArrayLockRelease(); + } } } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index b402999d8e..10ec83b26f 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -46,6 +46,7 @@ #include "storage/pmsignal.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/procarraylock.h" #include "storage/procsignal.h" #include "storage/spin.h" #include "utils/timestamp.h" @@ -1083,7 +1084,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) PGPROC *autovac = GetBlockingAutoVacuumPgproc(); PGXACT *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno]; - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayLockAcquire(PAL_EXCLUSIVE); /* * Only do it if the worker is not working to protect against Xid @@ -1099,7 +1100,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) pid); /* don't hold the lock across the kill() syscall */ - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* send the autovacuum worker Back to Old Kent Road */ if (kill(pid, SIGINT) < 0) @@ -1111,7 +1112,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) } } else - LWLockRelease(ProcArrayLock); + ProcArrayLockRelease(); /* prevent signal from being resent more than once */ allow_autovacuum_cancel = false; diff --git a/src/backend/storage/lmgr/procarraylock.c b/src/backend/storage/lmgr/procarraylock.c new file mode 100644 index 0000000000..7cd4b6bae9 --- /dev/null +++ b/src/backend/storage/lmgr/procarraylock.c @@ -0,0 +1,344 @@ +/*------------------------------------------------------------------------- + * + * procarraylock.c + * Lock management for the ProcArray + * + * Because the ProcArray data structure is highly trafficked, it is + * critical that mutual exclusion for ProcArray options be as efficient + * as possible. A particular problem is transaction end (commit or abort) + * which cannot be done in parallel with snapshot acquisition. We + * therefore include some special hacks to deal with this case efficiently. + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/lmgr/procarraylock.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "pg_trace.h" +#include "access/transam.h" +#include "storage/flexlock_internals.h" +#include "storage/ipc.h" +#include "storage/procarraylock.h" +#include "storage/proc.h" +#include "storage/spin.h" + +typedef struct ProcArrayLockStruct +{ + FlexLock flex; /* common FlexLock infrastructure */ + char exclusive; /* # of exclusive holders (0 or 1) */ + int shared; /* # of shared holders (0..MaxBackends) */ + PGPROC *ending; /* transactions wishing to clear state */ + TransactionId latest_ending_xid; /* latest ending XID */ +} ProcArrayLockStruct; + +/* There is only one ProcArrayLock. */ +#define ProcArrayLockPointer() \ + (AssertMacro(FlexLockArray[ProcArrayLock].flex.locktype == \ + FLEXLOCK_TYPE_PROCARRAYLOCK), \ + (volatile ProcArrayLockStruct *) &FlexLockArray[ProcArrayLock]) + +/* + * ProcArrayLockAcquire - acquire a lightweight lock in the specified mode + * + * If the lock is not available, sleep until it is. + * + * Side effect: cancel/die interrupts are held off until lock release. + */ +void +ProcArrayLockAcquire(ProcArrayLockMode mode) +{ + volatile ProcArrayLockStruct *lock = ProcArrayLockPointer(); + PGPROC *proc = MyProc; + bool retry = false; + int extraWaits = 0; + + /* + * We can't wait if we haven't got a PGPROC. This should only occur + * during bootstrap or shared memory initialization. Put an Assert here + * to catch unsafe coding practices. + */ + Assert(!(proc == NULL && IsUnderPostmaster)); + + /* + * Lock out cancel/die interrupts until we exit the code section protected + * by the ProcArrayLock. This ensures that interrupts will not interfere + * with manipulations of data structures in shared memory. + */ + HOLD_INTERRUPTS(); + + /* + * Loop here to try to acquire lock after each time we are signaled by + * ProcArrayLockRelease. See comments in LWLockAcquire for an explanation + * of why do we not attempt to hand off the lock directly. + */ + for (;;) + { + bool mustwait; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->flex.mutex); + + /* If retrying, allow LWLockRelease to release waiters again */ + if (retry) + lock->flex.releaseOK = true; + + /* If I can get the lock, do so quickly. */ + if (mode == PAL_EXCLUSIVE) + { + if (lock->exclusive == 0 && lock->shared == 0) + { + lock->exclusive++; + mustwait = false; + } + else + mustwait = true; + } + else + { + if (lock->exclusive == 0) + { + lock->shared++; + mustwait = false; + } + else + mustwait = true; + } + + if (!mustwait) + break; /* got the lock */ + + /* Add myself to wait queue. */ + FlexLockJoinWaitQueue(lock, (int) mode); + + /* Can release the mutex now */ + SpinLockRelease(&lock->flex.mutex); + + /* Wait until awakened. */ + extraWaits += FlexLockWait(ProcArrayLock, mode); + + /* Now loop back and try to acquire lock again. */ + retry = true; + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->flex.mutex); + + TRACE_POSTGRESQL_FLEXLOCK_ACQUIRE(lockid, mode); + + /* Add lock to list of locks held by this backend */ + FlexLockRemember(ProcArrayLock); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); +} + +/* + * ProcArrayLockClearTransaction - safely clear transaction details + * + * This can't be done while ProcArrayLock is held, but it's so fast that + * we can afford to do it while holding the spinlock, rather than acquiring + * and releasing the lock. + */ +void +ProcArrayLockClearTransaction(TransactionId latestXid) +{ + volatile ProcArrayLockStruct *lock = ProcArrayLockPointer(); + PGPROC *proc = MyProc; + int extraWaits = 0; + bool mustwait; + + HOLD_INTERRUPTS(); + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->flex.mutex); + + if (lock->exclusive == 0 && lock->shared == 0) + { + { + volatile PGPROC *vproc = proc; + volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno]; + /* If there are no lockers, clear the critical PGPROC fields. */ + pgxact->xid = InvalidTransactionId; + pgxact->xmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ + pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; + pgxact->nxids = 0; + pgxact->overflowed = false; + } + mustwait = false; + + /* Also advance global latestCompletedXid while holding the lock */ + if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, + latestXid)) + ShmemVariableCache->latestCompletedXid = latestXid; + } + else + { + /* Rats, must wait. */ + proc->flWaitLink = lock->ending; + lock->ending = proc; + if (!TransactionIdIsValid(lock->latest_ending_xid) || + TransactionIdPrecedes(lock->latest_ending_xid, latestXid)) + lock->latest_ending_xid = latestXid; + mustwait = true; + } + + /* Can release the mutex now */ + SpinLockRelease(&lock->flex.mutex); + + /* + * If we were not able to perfom the operation immediately, we must wait. + * But we need not retry after being awoken, because the last lock holder + * to release the lock will do the work first, on our behalf. + */ + if (mustwait) + { + extraWaits += FlexLockWait(ProcArrayLock, 2); + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + } + + RESUME_INTERRUPTS(); +} + +/* + * ProcArrayLockRelease - release a previously acquired lock + */ +void +ProcArrayLockRelease(void) +{ + volatile ProcArrayLockStruct *lock = ProcArrayLockPointer(); + PGPROC *head; + PGPROC *ending = NULL; + PGPROC *proc; + + FlexLockForget(ProcArrayLock); + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->flex.mutex); + + /* Release my hold on lock */ + if (lock->exclusive > 0) + lock->exclusive--; + else + { + Assert(lock->shared > 0); + lock->shared--; + } + + /* + * If the lock is now free, but there are some transactions trying to + * end, we must clear the critical PGPROC fields for them, and save a + * list of them so we can wake them up. + */ + if (lock->exclusive == 0 && lock->shared == 0 && lock->ending != NULL) + { + volatile PGPROC *vproc; + + ending = lock->ending; + vproc = ending; + + while (vproc != NULL) + { + volatile PGXACT *pgxact = &ProcGlobal->allPgXact[vproc->pgprocno]; + + pgxact->xid = InvalidTransactionId; + pgxact->xmin = InvalidTransactionId; + /* must be cleared with xid/xmin: */ + pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK; + pgxact->nxids = 0; + pgxact->overflowed = false; + vproc = vproc->flWaitLink; + } + + /* Also advance global latestCompletedXid */ + if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, + lock->latest_ending_xid)) + ShmemVariableCache->latestCompletedXid = lock->latest_ending_xid; + + /* Reset lock state. */ + lock->ending = NULL; + lock->latest_ending_xid = InvalidTransactionId; + } + + /* + * See if I need to awaken any waiters. If I released a non-last shared + * hold, there cannot be anything to do. Also, do not awaken any waiters + * if someone has already awakened waiters that haven't yet acquired the + * lock. + */ + head = lock->flex.head; + if (head != NULL) + { + if (lock->exclusive == 0 && lock->shared == 0 && lock->flex.releaseOK) + { + /* + * Remove the to-be-awakened PGPROCs from the queue. If the front + * waiter wants exclusive lock, awaken him only. Otherwise awaken + * as many waiters as want shared access. + */ + proc = head; + if (proc->flWaitMode != LW_EXCLUSIVE) + { + while (proc->flWaitLink != NULL && + proc->flWaitLink->flWaitMode != LW_EXCLUSIVE) + proc = proc->flWaitLink; + } + /* proc is now the last PGPROC to be released */ + lock->flex.head = proc->flWaitLink; + proc->flWaitLink = NULL; + /* prevent additional wakeups until retryer gets to run */ + lock->flex.releaseOK = false; + } + else + { + /* lock is still held, can't awaken anything */ + head = NULL; + } + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->flex.mutex); + + TRACE_POSTGRESQL_FLEXLOCK_RELEASE(lockid); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + FlexLockDebug("LWLockRelease", lockid, "release waiter"); + proc = head; + head = proc->flWaitLink; + proc->flWaitLink = NULL; + proc->flWaitResult = 1; /* any non-zero value will do */ + PGSemaphoreUnlock(&proc->sem); + } + + /* + * Also awaken any processes whose critical PGPROC fields I cleared + */ + while (ending != NULL) + { + FlexLockDebug("LWLockRelease", lockid, "release ending"); + proc = ending; + ending = proc->flWaitLink; + proc->flWaitLink = NULL; + proc->flWaitResult = 1; /* any non-zero value will do */ + PGSemaphoreUnlock(&proc->sem); + } + + /* + * Now okay to allow cancel/die interrupts. + */ + RESUME_INTERRUPTS(); +} diff --git a/src/include/storage/flexlock_internals.h b/src/include/storage/flexlock_internals.h index 4fcb3423dd..a5c571177d 100644 --- a/src/include/storage/flexlock_internals.h +++ b/src/include/storage/flexlock_internals.h @@ -41,6 +41,7 @@ typedef struct FlexLock } FlexLock; #define FLEXLOCK_TYPE_LWLOCK 'l' +#define FLEXLOCK_TYPE_PROCARRAYLOCK 'p' typedef union FlexLockPadded { diff --git a/src/include/storage/procarraylock.h b/src/include/storage/procarraylock.h new file mode 100644 index 0000000000..678ca6ffe9 --- /dev/null +++ b/src/include/storage/procarraylock.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * procarraylock.h + * Lock management for the ProcArray + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/lwlock.h + * + *------------------------------------------------------------------------- + */ +#ifndef PROCARRAYLOCK_H +#define PROCARRAYLOCK_H + +#include "storage/flexlock.h" + +typedef enum ProcArrayLockMode +{ + PAL_EXCLUSIVE, + PAL_SHARED +} ProcArrayLockMode; + +extern void ProcArrayLockAcquire(ProcArrayLockMode mode); +extern void ProcArrayLockClearTransaction(TransactionId latestXid); +extern void ProcArrayLockRelease(void); + +#endif /* PROCARRAYLOCK_H */ |