This speeds up snapshot-taking and reduces ProcArrayLock contention.
Also, the PGPROC (and PGXACT) structures used by two-phase commit are
now allocated as part of the main array, rather than in a separate
array, and we keep ProcArray sorted in pointer order. These changes
are intended to minimize the number of cache lines that must be pulled
in to take a snapshot, and testing shows a substantial increase in
performance on both read and write workloads at high concurrencies.
Pavan Deolasee, Heikki Linnakangas, Robert Haas
typedef struct GlobalTransactionData
{
- PGPROC proc; /* dummy proc */
+ GlobalTransaction next;
+ int pgprocno; /* dummy proc */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
sizeof(GlobalTransaction) * max_prepared_xacts));
for (i = 0; i < max_prepared_xacts; i++)
{
- gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+ gxacts[i].next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = &gxacts[i];
/*
TimestampTz prepared_at, Oid owner, Oid databaseid)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGXACT *pgxact;
int i;
if (strlen(gid) >= GIDSIZE)
TwoPhaseState->numPrepXacts--;
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
/* Back up index count too, so we don't miss scanning one */
i--;
errhint("Increase max_prepared_transactions (currently %d).",
max_prepared_xacts)));
gxact = TwoPhaseState->freeGXacts;
- TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
+ TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
- /* Initialize it */
- MemSet(&gxact->proc, 0, sizeof(PGPROC));
- SHMQueueElemInit(&(gxact->proc.links));
- gxact->proc.waitStatus = STATUS_OK;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+ /* Initialize the PGPROC entry */
+ MemSet(proc, 0, sizeof(PGPROC));
+ proc->pgprocno = gxact->pgprocno;
+ SHMQueueElemInit(&(proc->links));
+ proc->waitStatus = STATUS_OK;
/* We set up the gxact's VXID as InvalidBackendId/XID */
- gxact->proc.lxid = (LocalTransactionId) xid;
- gxact->proc.xid = xid;
- gxact->proc.xmin = InvalidTransactionId;
- gxact->proc.pid = 0;
- gxact->proc.backendId = InvalidBackendId;
- gxact->proc.databaseId = databaseid;
- gxact->proc.roleId = owner;
- gxact->proc.inCommit = false;
- gxact->proc.vacuumFlags = 0;
- gxact->proc.lwWaiting = false;
- gxact->proc.lwExclusive = false;
- gxact->proc.lwWaitLink = NULL;
- gxact->proc.waitLock = NULL;
- gxact->proc.waitProcLock = NULL;
+ proc->lxid = (LocalTransactionId) xid;
+ pgxact->xid = xid;
+ pgxact->xmin = InvalidTransactionId;
+ pgxact->inCommit = false;
+ pgxact->vacuumFlags = 0;
+ proc->pid = 0;
+ proc->backendId = InvalidBackendId;
+ proc->databaseId = databaseid;
+ proc->roleId = owner;
+ proc->lwWaiting = false;
+ proc->lwExclusive = false;
+ proc->lwWaitLink = NULL;
+ proc->waitLock = NULL;
+ proc->waitProcLock = NULL;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- SHMQueueInit(&(gxact->proc.myProcLocks[i]));
+ SHMQueueInit(&(proc->myProcLocks[i]));
/* subxid data must be filled later by GXactLoadSubxactData */
- gxact->proc.subxids.overflowed = false;
- gxact->proc.subxids.nxids = 0;
+ pgxact->overflowed = false;
+ pgxact->nxids = 0;
gxact->prepared_at = prepared_at;
/* initialize LSN to 0 (start of WAL) */
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
TransactionId *children)
{
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
/* We need no extra lock since the GXACT isn't valid yet */
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
{
- gxact->proc.subxids.overflowed = true;
+ pgxact->overflowed = true;
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
}
if (nsubxacts > 0)
{
- memcpy(gxact->proc.subxids.xids, children,
+ memcpy(proc->subxids.xids, children,
nsubxacts * sizeof(TransactionId));
- gxact->proc.subxids.nxids = nsubxacts;
+ pgxact->nxids = nsubxacts;
}
}
* Put it into the global ProcArray so TransactionIdIsInProgress considers
* the XID as still running.
*/
- ProcArrayAdd(&gxact->proc);
+ ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
}
/*
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
/* Ignore not-yet-valid GIDs */
if (!gxact->valid)
* there may be some other issues as well. Hence disallow until
* someone gets motivated to make it work.
*/
- if (MyDatabaseId != gxact->proc.databaseId)
+ if (MyDatabaseId != proc->databaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("prepared transaction belongs to another database"),
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
LWLockRelease(TwoPhaseStateLock);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->valid && gxact->proc.xid == xid)
+ if (gxact->valid && pgxact->xid == xid)
{
result = true;
break;
while (status->array != NULL && status->currIdx < status->ngxacts)
{
GlobalTransaction gxact = &status->array[status->currIdx++];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
Datum values[5];
bool nulls[5];
HeapTuple tuple;
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
- values[0] = TransactionIdGetDatum(gxact->proc.xid);
+ values[0] = TransactionIdGetDatum(pgxact->xid);
values[1] = CStringGetTextDatum(gxact->gid);
values[2] = TimestampTzGetDatum(gxact->prepared_at);
values[3] = ObjectIdGetDatum(gxact->owner);
- values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+ values[4] = ObjectIdGetDatum(proc->databaseId);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
- if (gxact->proc.xid == xid)
+ if (pgxact->xid == xid)
{
- result = &gxact->proc;
+ result = &ProcGlobal->allProcs[gxact->pgprocno];
break;
}
}
void
StartPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ TransactionId xid = pgxact->xid;
TwoPhaseFileHeader hdr;
TransactionId *children;
RelFileNode *commitrels;
hdr.magic = TWOPHASE_MAGIC;
hdr.total_len = 0; /* EndPrepare will fill this in */
hdr.xid = xid;
- hdr.database = gxact->proc.databaseId;
+ hdr.database = proc->databaseId;
hdr.prepared_at = gxact->prepared_at;
hdr.owner = gxact->owner;
hdr.nsubxacts = xactGetCommittedChildren(&children);
void
EndPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
XLogRecData *record;
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head);
* checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing.
*/
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGXACT *pgxact;
TransactionId xid;
char *buf;
char *bufptr;
* try to commit the same GID at once.
*/
gxact = LockGXact(gid, GetUserId());
- xid = gxact->proc.xid;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+ xid = pgxact->xid;
/*
* Read and validate the state file
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
- ProcArrayRemove(&gxact->proc, latestXid);
+ ProcArrayRemove(proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
if (gxact->valid &&
XLByteLE(gxact->prepare_lsn, redo_horizon))
- xids[nxids++] = gxact->proc.xid;
+ xids[nxids++] = pgxact->xid;
}
LWLockRelease(TwoPhaseStateLock);
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
if (IsBootstrapProcessingMode())
{
Assert(!isSubXact);
- MyProc->xid = BootstrapTransactionId;
+ MyPgXact->xid = BootstrapTransactionId;
return BootstrapTransactionId;
}
* TransactionId and int fetch/store are atomic.
*/
volatile PGPROC *myproc = MyProc;
+ volatile PGXACT *mypgxact = MyPgXact;
if (!isSubXact)
- myproc->xid = xid;
+ mypgxact->xid = xid;
else
{
- int nxids = myproc->subxids.nxids;
+ int nxids = mypgxact->nxids;
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
{
myproc->subxids.xids[nxids] = xid;
- myproc->subxids.nxids = nxids + 1;
+ mypgxact->nxids = nxids + 1;
}
else
- myproc->subxids.overflowed = true;
+ mypgxact->overflowed = true;
}
}
* bit fuzzy, but it doesn't matter.
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyPgXact->inCommit = true;
SetCurrentTransactionStopTimestamp();
*/
if (markXidCommitted)
{
- MyProc->inCommit = false;
+ MyPgXact->inCommit = false;
END_CRIT_SECTION();
}
* OK, let's do it. First let other backends know I'm in ANALYZE.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags |= PROC_IN_ANALYZE;
+ MyPgXact->vacuumFlags |= PROC_IN_ANALYZE;
LWLockRelease(ProcArrayLock);
/*
* because the vacuum flag is cleared by the end-of-xact code.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags &= ~PROC_IN_ANALYZE;
+ MyPgXact->vacuumFlags &= ~PROC_IN_ANALYZE;
LWLockRelease(ProcArrayLock);
}
* which is probably Not Good.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags |= PROC_IN_VACUUM;
+ MyPgXact->vacuumFlags |= PROC_IN_VACUUM;
if (for_wraparound)
- MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
+ MyPgXact->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
LWLockRelease(ProcArrayLock);
}
slock_t *ProcStructLock;
PROC_HDR *ProcGlobal;
PGPROC *AuxiliaryProcs;
+ PGPROC *PreparedXactProcs;
PMSignalData *PMSignalState;
InheritableSocket pgStatSock;
pid_t PostmasterPid;
param->ProcStructLock = ProcStructLock;
param->ProcGlobal = ProcGlobal;
param->AuxiliaryProcs = AuxiliaryProcs;
+ param->PreparedXactProcs = PreparedXactProcs;
param->PMSignalState = PMSignalState;
if (!write_inheritable_socket(¶m->pgStatSock, pgStatSock, childPid))
return false;
ProcStructLock = param->ProcStructLock;
ProcGlobal = param->ProcGlobal;
AuxiliaryProcs = param->AuxiliaryProcs;
+ PreparedXactProcs = param->PreparedXactProcs;
PMSignalState = param->PMSignalState;
read_inheritable_socket(&pgStatSock, ¶m->pgStatSock);
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- MyProc->xmin = reply.xmin;
+ MyPgXact->xmin = reply.xmin;
}
/* Main loop of walsender process */
XLOGShmemInit();
CLOGShmemInit();
SUBTRANSShmemInit();
- TwoPhaseShmemInit();
MultiXactShmemInit();
InitBufferPool();
InitProcGlobal();
CreateSharedProcArray();
CreateSharedBackendStatus();
+ TwoPhaseShmemInit();
/*
* Set up shared-inval messaging
TransactionId lastOverflowedXid;
/*
- * We declare procs[] as 1 entry because C wants a fixed-size array, but
+ * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, but
* actually it is maxProcs entries long.
*/
- PGPROC *procs[1]; /* VARIABLE LENGTH ARRAY */
+ int pgprocnos[1]; /* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;
static ProcArrayStruct *procArray;
+static PGPROC *allProcs;
+static PGXACT *allPgXact;
+
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
- size = offsetof(ProcArrayStruct, procs);
- size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+ size = offsetof(ProcArrayStruct, pgprocnos);
+ size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
/*
* During Hot Standby processing we have a data structure called
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
ShmemInitStruct("Proc Array",
- add_size(offsetof(ProcArrayStruct, procs),
- mul_size(sizeof(PGPROC *),
+ add_size(offsetof(ProcArrayStruct, pgprocnos),
+ mul_size(sizeof(int),
PROCARRAY_MAXPROCS)),
&found);
procArray->lastOverflowedXid = InvalidTransactionId;
}
+ allProcs = ProcGlobal->allProcs;
+ allPgXact = ProcGlobal->allPgXact;
+
/* Create or attach to the KnownAssignedXids arrays too, if needed */
if (EnableHotStandby)
{
ProcArrayAdd(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
+ int index;
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
errmsg("sorry, too many clients already")));
}
- arrayP->procs[arrayP->numProcs] = proc;
+ /*
+ * Keep the procs array sorted by (PGPROC *) so that we can utilize
+ * locality of references much better. This is useful while traversing the
+ * ProcArray because there is a increased likelyhood of finding the next
+ * PGPROC structure in the cache.
+ *
+ * Since the occurance of adding/removing a proc is much lower than the
+ * access to the ProcArray itself, the overhead should be marginal
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ /*
+ * If we are the first PGPROC or if we have found our right position in
+ * the array, break
+ */
+ if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
+ break;
+ }
+
+ memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
+ (arrayP->numProcs - index) * sizeof (int));
+ arrayP->pgprocnos[index] = proc->pgprocno;
arrayP->numProcs++;
LWLockRelease(ProcArrayLock);
if (TransactionIdIsValid(latestXid))
{
- Assert(TransactionIdIsValid(proc->xid));
+ Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
/* Advance global latestCompletedXid while holding the lock */
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
else
{
/* Shouldn't be trying to remove a live transaction here */
- Assert(!TransactionIdIsValid(proc->xid));
+ Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
}
for (index = 0; index < arrayP->numProcs; index++)
{
- if (arrayP->procs[index] == proc)
+ if (arrayP->pgprocnos[index] == proc->pgprocno)
{
- arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
- arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */
+ /* Keep the PGPROC array sorted. See notes above */
+ memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
+ (arrayP->numProcs - index - 1) * sizeof (int));
+ arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;
LWLockRelease(ProcArrayLock);
return;
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
+ PGXACT *pgxact = &allPgXact[proc->pgprocno];
+
if (TransactionIdIsValid(latestXid))
{
/*
- * We must lock ProcArrayLock while clearing proc->xid, so that we do
- * not exit the set of "running" transactions while someone else is
- * taking a snapshot. See discussion in
+ * We must lock ProcArrayLock while clearing our advertised XID, so
+ * that we do not exit the set of "running" transactions while someone
+ * else is taking a snapshot. See discussion in
* src/backend/access/transam/README.
*/
- Assert(TransactionIdIsValid(proc->xid));
+ Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- proc->xid = InvalidTransactionId;
+ pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ pgxact->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false; /* be sure this is cleared in abort */
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
/* Clear the subtransaction-XID cache too while holding the lock */
- proc->subxids.nxids = 0;
- proc->subxids.overflowed = false;
+ pgxact->nxids = 0;
+ pgxact->overflowed = false;
/* Also advance global latestCompletedXid while holding the lock */
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
* anyone else's calculation of a snapshot. We might change their
* estimate of global xmin, but that's OK.
*/
- Assert(!TransactionIdIsValid(proc->xid));
+ Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ pgxact->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false; /* be sure this is cleared in abort */
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
- Assert(proc->subxids.nxids == 0);
- Assert(proc->subxids.overflowed == false);
+ Assert(pgxact->nxids == 0);
+ Assert(pgxact->overflowed == false);
}
}
void
ProcArrayClearTransaction(PGPROC *proc)
{
+ PGXACT *pgxact = &allPgXact[proc->pgprocno];
+
/*
* We can skip locking ProcArrayLock here, because this action does not
* actually change anyone's view of the set of running XIDs: our entry is
* duplicate with the gxact that has already been inserted into the
* ProcArray.
*/
- proc->xid = InvalidTransactionId;
+ pgxact->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ pgxact->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
/* redundant, but just in case */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false;
+ pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ pgxact->inCommit = false;
/* Clear the subtransaction-XID cache too */
- proc->subxids.nxids = 0;
- proc->subxids.overflowed = false;
+ pgxact->nxids = 0;
+ pgxact->overflowed = false;
}
/*
/* No shortcuts, gotta grovel through the array */
for (i = 0; i < arrayP->numProcs; i++)
{
- volatile PGPROC *proc = arrayP->procs[i];
- TransactionId pxid;
+ int pgprocno = arrayP->pgprocnos[i];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId pxid;
/* Ignore my own proc --- dealt with it above */
if (proc == MyProc)
continue;
/* Fetch xid just once - see GetNewTransactionId */
- pxid = proc->xid;
+ pxid = pgxact->xid;
if (!TransactionIdIsValid(pxid))
continue;
/*
* Step 2: check the cached child-Xids arrays
*/
- for (j = proc->subxids.nxids - 1; j >= 0; j--)
+ for (j = pgxact->nxids - 1; j >= 0; j--)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId cxid = proc->subxids.xids[j];
* we hold ProcArrayLock. So we can't miss an Xid that we need to
* worry about.)
*/
- if (proc->subxids.overflowed)
+ if (pgxact->overflowed)
xids[nxids++] = pxid;
}
for (i = 0; i < arrayP->numProcs; i++)
{
- volatile PGPROC *proc = arrayP->procs[i];
+ int pgprocno = arrayP->pgprocnos[i];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = pgxact->xid;
if (!TransactionIdIsValid(pxid))
continue;
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
- if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
+ if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
continue;
if (allDbs ||
proc->databaseId == 0) /* always include WalSender */
{
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId xid = proc->xid;
+ TransactionId xid = pgxact->xid;
/* First consider the transaction's own Xid, if any */
if (TransactionIdIsNormal(xid) &&
* have an Xmin but not (yet) an Xid; conversely, if it has an
* Xid, that could determine some not-yet-set Xmin.
*/
- xid = proc->xmin; /* Fetch just once */
+ xid = pgxact->xmin; /* Fetch just once */
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, result))
result = xid;
if (!snapshot->takenDuringRecovery)
{
+ int *pgprocnos = arrayP->pgprocnos;
+ int numProcs;
+
/*
* Spin over procArray checking xid, xmin, and subxids. The goal is
* to gather all active xids, find the lowest xmin, and try to record
- * subxids. During recovery no xids will be assigned, so all normal
- * backends can be ignored, nor are there any VACUUMs running. All
- * prepared transaction xids are held in KnownAssignedXids, so these
- * will be seen without needing to loop through procs here.
+ * subxids.
*/
- for (index = 0; index < arrayP->numProcs; index++)
+ numProcs = arrayP->numProcs;
+ for (index = 0; index < numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
- TransactionId xid;
+ int pgprocno = pgprocnos[index];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId xid;
/* Ignore procs running LAZY VACUUM */
- if (proc->vacuumFlags & PROC_IN_VACUUM)
+ if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
/* Update globalxmin to be the smallest valid xmin */
- xid = proc->xmin; /* fetch just once */
+ xid = pgxact->xmin; /* fetch just once */
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, globalxmin))
- globalxmin = xid;
+ globalxmin = xid;
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = pgxact->xid;
/*
* If the transaction has been assigned an xid < xmax we add it to
{
if (TransactionIdFollowsOrEquals(xid, xmax))
continue;
- if (proc != MyProc)
+ if (pgxact != MyPgXact)
snapshot->xip[count++] = xid;
if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
*
* Again, our own XIDs are not included in the snapshot.
*/
- if (!suboverflowed && proc != MyProc)
+ if (!suboverflowed && pgxact != MyPgXact)
{
- if (proc->subxids.overflowed)
+ if (pgxact->overflowed)
suboverflowed = true;
else
{
- int nxids = proc->subxids.nxids;
+ int nxids = pgxact->nxids;
if (nxids > 0)
{
+ volatile PGPROC *proc = &allProcs[pgprocno];
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
suboverflowed = true;
}
- if (!TransactionIdIsValid(MyProc->xmin))
- MyProc->xmin = TransactionXmin = xmin;
-
+ if (!TransactionIdIsValid(MyPgXact->xmin))
+ MyPgXact->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
/*
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
- TransactionId xid;
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId xid;
/* Ignore procs running LAZY VACUUM */
- if (proc->vacuumFlags & PROC_IN_VACUUM)
+ if (pgxact->vacuumFlags & PROC_IN_VACUUM)
continue;
- xid = proc->xid; /* fetch just once */
+ xid = pgxact->xid; /* fetch just once */
if (xid != sourcexid)
continue;
/*
* Likewise, let's just make real sure its xmin does cover us.
*/
- xid = proc->xmin; /* fetch just once */
+ xid = pgxact->xmin; /* fetch just once */
if (!TransactionIdIsNormal(xid) ||
!TransactionIdPrecedesOrEquals(xid, xmin))
continue;
* GetSnapshotData first, we'll be overwriting a valid xmin here,
* so we don't check that.)
*/
- MyProc->xmin = TransactionXmin = xmin;
+ MyPgXact->xmin = TransactionXmin = xmin;
result = true;
break;
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid;
int nxids;
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = pgxact->xid;
/*
* We don't need to store transactions that don't have a TransactionId
* Save subtransaction XIDs. Other backends can't add or remove
* entries while we're holding XidGenLock.
*/
- nxids = proc->subxids.nxids;
+ nxids = pgxact->nxids;
if (nxids > 0)
{
memcpy(&xids[count], (void *) proc->subxids.xids,
count += nxids;
subcount += nxids;
- if (proc->subxids.overflowed)
+ if (pgxact->overflowed)
suboverflowed = true;
/*
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid;
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = pgxact->xid;
if (!TransactionIdIsNormal(xid))
continue;
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = pgxact->xid;
- if (proc->inCommit && TransactionIdIsValid(pxid))
+ if (pgxact->inCommit && TransactionIdIsValid(pxid))
xids[nxids++] = pxid;
}
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = pgxact->xid;
- if (proc->inCommit && TransactionIdIsValid(pxid))
+ if (pgxact->inCommit && TransactionIdIsValid(pxid))
{
int i;
for (index = 0; index < arrayP->numProcs; index++)
{
- PGPROC *proc = arrayP->procs[index];
+ PGPROC *proc = &allProcs[arrayP->pgprocnos[index]];
if (proc->pid == pid)
{
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
- if (proc->xid == xid)
+ if (pgxact->xid == xid)
{
result = proc->pid;
break;
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
if (proc == MyProc)
continue;
- if (excludeVacuum & proc->vacuumFlags)
+ if (excludeVacuum & pgxact->vacuumFlags)
continue;
if (allDbs || proc->databaseId == MyDatabaseId)
{
/* Fetch xmin just once - might change on us */
- TransactionId pxmin = proc->xmin;
+ TransactionId pxmin = pgxact->xmin;
if (excludeXmin0 && !TransactionIdIsValid(pxmin))
continue;
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
/* Exclude prepared transactions */
if (proc->pid == 0)
proc->databaseId == dbOid)
{
/* Fetch xmin just once - can't change on us, but good coding */
- TransactionId pxmin = proc->xmin;
+ TransactionId pxmin = pgxact->xmin;
/*
* We ignore an invalid pxmin because this means that backend has
for (index = 0; index < arrayP->numProcs; index++)
{
- VirtualTransactionId procvxid;
- PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ VirtualTransactionId procvxid;
GET_VXID_FROM_PGPROC(procvxid, *proc);
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
/*
* Since we're not holding a lock, need to check that the pointer is
if (proc == MyProc)
continue; /* do not count myself */
+ if (pgxact->xid == InvalidTransactionId)
+ continue; /* do not count if no XID assigned */
if (proc->pid == 0)
continue; /* do not count prepared xacts */
- if (proc->xid == InvalidTransactionId)
- continue; /* do not count if no XID assigned */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (proc->pid == 0)
continue; /* do not count prepared xacts */
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (databaseid == InvalidOid || proc->databaseId == databaseid)
{
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (proc->pid == 0)
continue; /* do not count prepared xacts */
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
if (proc->databaseId != databaseId)
continue;
else
{
(*nbackends)++;
- if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
+ if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
nautovacs < MAXAUTOVACPIDS)
autovac_pids[nautovacs++] = proc->pid;
}
#define XidCacheRemove(i) \
do { \
- MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
- MyProc->subxids.nxids--; \
+ MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
+ MyPgXact->nxids--; \
} while (0)
/*
{
TransactionId anxid = xids[i];
- for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
+ for (j = MyPgXact->nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
{
* error during AbortSubTransaction. So instead of Assert, emit a
* debug warning.
*/
- if (j < 0 && !MyProc->subxids.overflowed)
+ if (j < 0 && !MyPgXact->overflowed)
elog(WARNING, "did not find subXID %u in MyProc", anxid);
}
- for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
+ for (j = MyPgXact->nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
{
}
}
/* Ordinarily we should have found it, unless the cache has overflowed */
- if (j < 0 && !MyProc->subxids.overflowed)
+ if (j < 0 && !MyPgXact->overflowed)
elog(WARNING, "did not find subXID %u in MyProc", xid);
/* Also advance global latestCompletedXid while holding the lock */
int *nSoftEdges) /* output argument */
{
PGPROC *proc;
+ PGXACT *pgxact;
LOCK *lock;
PROCLOCK *proclock;
SHM_QUEUE *procLocks;
while (proclock)
{
proc = proclock->tag.myProc;
+ pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
/* A proc never blocks itself */
if (proc != checkProc)
* vacuumFlag bit), but we don't do that here to avoid
* grabbing ProcArrayLock.
*/
- if (proc->vacuumFlags & PROC_IS_AUTOVACUUM)
+ if (pgxact->vacuumFlags & PROC_IS_AUTOVACUUM)
blocking_autovacuum_proc = proc;
/* This proc hard-blocks checkProc */
proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
{
PGPROC *proc = proclock->tag.myProc;
+ PGXACT *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
LOCK *lock = proclock->tag.myLock;
- accessExclusiveLocks[index].xid = proc->xid;
+ accessExclusiveLocks[index].xid = pgxact->xid;
accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
#include <sys/time.h>
#include "access/transam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
/* Pointer to this process's PGPROC struct, if any */
PGPROC *MyProc = NULL;
+PGXACT *MyPgXact = NULL;
/*
* This spinlock protects the freelist of recycled PGPROC structures.
/* Pointers to shared-memory structures */
PROC_HDR *ProcGlobal = NULL;
NON_EXEC_STATIC PGPROC *AuxiliaryProcs = NULL;
+PGPROC *PreparedXactProcs = NULL;
/* If we are waiting for a lock, this points to the associated LOCALLOCK */
static LOCALLOCK *lockAwaited = NULL;
/* ProcGlobal */
size = add_size(size, sizeof(PROC_HDR));
- /* AuxiliaryProcs */
- size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
/* MyProcs, including autovacuum workers and launcher */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
+ /* AuxiliaryProcs */
+ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
+ /* Prepared xacts */
+ size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
+ size = add_size(size, mul_size(MaxBackends, sizeof(PGXACT)));
+ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGXACT)));
+ size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGXACT)));
+
return size;
}
InitProcGlobal(void)
{
PGPROC *procs;
+ PGXACT *pgxacts;
int i,
j;
bool found;
- uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS;
+ uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
/* Create the ProcGlobal shared structure */
ProcGlobal = (PROC_HDR *)
* those used for 2PC, which are embedded within a GlobalTransactionData
* struct).
*
- * There are three separate consumers of PGPROC structures: (1) normal
- * backends, (2) autovacuum workers and the autovacuum launcher, and (3)
- * auxiliary processes. Each PGPROC structure is dedicated to exactly
- * one of these purposes, and they do not move between groups.
+ * There are four separate consumers of PGPROC structures: (1) normal
+ * backends, (2) autovacuum workers and the autovacuum launcher, (3)
+ * auxiliary processes, and (4) prepared transactions. Each PGPROC
+ * structure is dedicated to exactly one of these purposes, and they do
+ * not move between groups.
*/
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
ProcGlobal->allProcs = procs;
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
MemSet(procs, 0, TotalProcs * sizeof(PGPROC));
+
+ /*
+ * Also allocate a separate array of PGXACT structures. This is separate
+ * from the main PGPROC array so that the most heavily accessed data is
+ * stored contiguously in memory in as few cache lines as possible. This
+ * provides significant performance benefits, especially on a
+ * multiprocessor system. Thereis one PGXACT structure for every PGPROC
+ * structure.
+ */
+ pgxacts = (PGXACT *) ShmemAlloc(TotalProcs * sizeof(PGXACT));
+ MemSet(pgxacts, 0, TotalProcs * sizeof(PGXACT));
+ ProcGlobal->allPgXact = pgxacts;
+
for (i = 0; i < TotalProcs; i++)
{
/* Common initialization for all PGPROCs, regardless of type. */
- /* Set up per-PGPROC semaphore, latch, and backendLock */
- PGSemaphoreCreate(&(procs[i].sem));
- InitSharedLatch(&(procs[i].procLatch));
- procs[i].backendLock = LWLockAssign();
+ /*
+ * Set up per-PGPROC semaphore, latch, and backendLock. Prepared
+ * xact dummy PGPROCs don't need these though - they're never
+ * associated with a real process
+ */
+ if (i < MaxBackends + NUM_AUXILIARY_PROCS)
+ {
+ PGSemaphoreCreate(&(procs[i].sem));
+ InitSharedLatch(&(procs[i].procLatch));
+ procs[i].backendLock = LWLockAssign();
+ }
+ procs[i].pgprocno = i;
/*
* Newly created PGPROCs for normal backends or for autovacuum must
* be queued up on the appropriate free list. Because there can only
* ever be a small, fixed number of auxiliary processes, no free
* list is used in that case; InitAuxiliaryProcess() instead uses a
- * linear search.
+ * linear search. PGPROCs for prepared transactions are added to a
+ * free list by TwoPhaseShmemInit().
*/
if (i < MaxConnections)
{
}
/*
- * Save a pointer to the block of PGPROC structures reserved for
- * auxiliary proceses.
+ * Save pointers to the blocks of PGPROC structures reserved for
+ * auxiliary processes and prepared transactions.
*/
AuxiliaryProcs = &procs[MaxBackends];
+ PreparedXactProcs = &procs[MaxBackends + NUM_AUXILIARY_PROCS];
/* Create ProcStructLock spinlock, too */
ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
}
+ MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
/*
* Now that we have a PGPROC, mark ourselves as an active postmaster
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
- MyProc->xid = InvalidTransactionId;
- MyProc->xmin = InvalidTransactionId;
+ MyPgXact->xid = InvalidTransactionId;
+ MyPgXact->xmin = InvalidTransactionId;
MyProc->pid = MyProcPid;
/* backendId, databaseId and roleId will be filled in later */
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
- MyProc->inCommit = false;
- MyProc->vacuumFlags = 0;
+ MyPgXact->inCommit = false;
+ MyPgXact->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess())
- MyProc->vacuumFlags |= PROC_IS_AUTOVACUUM;
+ MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
((volatile PGPROC *) auxproc)->pid = MyProcPid;
MyProc = auxproc;
+ MyPgXact = &ProcGlobal->allPgXact[auxproc->pgprocno];
SpinLockRelease(ProcStructLock);
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
- MyProc->xid = InvalidTransactionId;
- MyProc->xmin = InvalidTransactionId;
+ MyPgXact->xid = InvalidTransactionId;
+ MyPgXact->xmin = InvalidTransactionId;
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
- MyProc->inCommit = false;
- MyProc->vacuumFlags = 0;
+ MyPgXact->inCommit = false;
+ MyPgXact->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
if (deadlock_state == DS_BLOCKED_BY_AUTOVACUUM && allow_autovacuum_cancel)
{
PGPROC *autovac = GetBlockingAutoVacuumPgproc();
+ PGXACT *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno];
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
* wraparound.
*/
if ((autovac != NULL) &&
- (autovac->vacuumFlags & PROC_IS_AUTOVACUUM) &&
- !(autovac->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
+ (autovac_pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
+ !(autovac_pgxact->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
{
int pid = autovac->pid;
SnapshotResetXmin(void)
{
if (RegisteredSnapshots == 0 && ActiveSnapshot == NULL)
- MyProc->xmin = InvalidTransactionId;
+ MyPgXact->xmin = InvalidTransactionId;
}
/*
struct XidCache
{
- bool overflowed;
- int nxids;
TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS];
};
LocalTransactionId lxid; /* local id of top-level transaction currently
* being executed by this proc, if running;
* else InvalidLocalTransactionId */
-
- TransactionId xid; /* id of top-level transaction currently being
- * executed by this proc, if running and XID
- * is assigned; else InvalidTransactionId */
-
- TransactionId xmin; /* minimal running XID as it was when we were
- * starting our xact, excluding LAZY VACUUM:
- * vacuum must not remove tuples deleted by
- * xid >= xmin ! */
-
int pid; /* Backend's process ID; 0 if prepared xact */
+ int pgprocno;
/* These fields are zero while a backend is still starting up: */
BackendId backendId; /* This backend's backend ID (if assigned) */
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */
- bool inCommit; /* true if within commit critical section */
-
- uint8 vacuumFlags; /* vacuum-related flags, see above */
-
/*
* While in hot standby mode, shows that a conflict signal has been sent
* for the current transaction. Set/cleared while holding ProcArrayLock,
extern PGDLLIMPORT PGPROC *MyProc;
+extern PGDLLIMPORT struct PGXACT *MyPgXact;
+
+/*
+ * Prior to PostgreSQL 9.2, the fieds below were stored as part of the
+ * PGPROC. However, benchmarking revealed that packing these particular
+ * members into a separate array as tightly as possible sped up GetSnapshotData
+ * considerably on systems with many CPU cores, by reducing the number of
+ * cache lines needing to be fetched. Thus, think very carefully before adding
+ * anything else here.
+ */
+typedef struct PGXACT
+{
+ TransactionId xid; /* id of top-level transaction currently being
+ * executed by this proc, if running and XID
+ * is assigned; else InvalidTransactionId */
+
+ TransactionId xmin; /* minimal running XID as it was when we were
+ * starting our xact, excluding LAZY VACUUM:
+ * vacuum must not remove tuples deleted by
+ * xid >= xmin ! */
+
+ uint8 vacuumFlags; /* vacuum-related flags, see above */
+ bool overflowed;
+ bool inCommit; /* true if within commit critical section */
+ uint8 nxids;
+} PGXACT;
/*
* There is one ProcGlobal struct for the whole database cluster.
{
/* Array of PGPROC structures (not including dummies for prepared txns) */
PGPROC *allProcs;
+ /* Array of PGXACT structures (not including dummies for prepared txns */
+ PGXACT *allPgXact;
/* Length of allProcs array */
uint32 allProcCount;
/* Head of list of free PGPROC structures */
extern PROC_HDR *ProcGlobal;
+extern PGPROC *PreparedXactProcs;
+
/*
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.