/*------------------------------------------------------------------------- * * gtm_snap.c * Snapshot handling on GTM * * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 2010-2012 Postgres-XC Development Group * * * IDENTIFICATION * src/gtm/main/gtm_snap.c * * * Functions in this source file manage transaction snapshots for global * transactions, i.e. a list of transactions (GXIDs) a particular transaction * sees as "running." The concept is mostly the same as in PostgreSQL (see * GetSnapshotData in src/backend/storage/ipc/procarray.c for details), except * that the snapshots are not computed on nodes but on GTM. * * The API is very simple as it consists of a single function: * * GTM_GetTransactionSnapshot * * that builds a snapshot and stores it in the main GTMTransactions array * tracking all in-progress transactions. When multiple snapshot requests get * grouped by GTM Proxy, we compute the snapshot only once and then use it * for all transactions in the request. As computing the snapshot is quite * expensive, this is a major benefit. * * There are also two functions handling the communication with GTM clients * (either nodes or GTM proxies): * * ProcessGetSnapshotCommand * ProcessGetSnapshotCommandMulti * * These functions are responsible for parsing of network messages, and then * simply call GTM_GetTransactionSnapshot with the proper arguments. * * * Memory management (GTM_SnapshotData) * ------------------------------------ * The snapshots are allocated in TopMostMemoryContext - we can't allocate * them in per-thread contexts as the transactions may outlive the client * connection (e.g. with prepared transactions). * * When building the snapshot, we use sn_xip from the first transaction (if * it was already allocated), or a thread-local variable (multiple threads * may be building snapshots at the same time). * * Once the snapshot is built, we copy it into the GTMTransaction array * for each of the transactions passed to GTM_GetTransactionSnapshot. * * Snapshot for each transaction is stored in gti_current_snapshot in the * global GTMTransactions array. We only need to allocate the sn_xip field, * tracking the GXIDs of running transactions, and allocate it just once * when the first transaction gets assigned to that slot, and then keep the * buffer for all future transactions in the slot (until GTM shuts down). * * The sn_xip buffer is always allocated with space for the maximum number * of GXIDSs, otherwise we might need to reallocate it. That means we need * space for up to GTM_MAX_GLOBAL_TRANSACTIONS, which is currently 16384. * Each GXID is 32 bits, so this means 64kB per GTMTransaction slot. There * are GTM_MAX_GLOBAL_TRANSACTIONS slots, so this means 16k sn_xip buffers * or 1GB of memory in total. * * XXX We may as well allocate all the memory at the very beginning - we * will eventually allocate the same memory anyway (it only takes a few * loops over GTMTransactions, so a few thousand transactions), and it * would completely eliminate the palloc/pfree calls. * * XXX There's a memory leak in thr_snapshot - the memory is allocated in * the global memory context, but we don't pfree it when the thread exits. * We can either allocate it in per-thread context or statically (as a * variable in the function). * * XXX While this eliminates the palloc/pfree calls, we still need to do * a number of memcpy() calls when copying the built snapshot data into * GTMTransaction for each of the transactions. Perhaps we should invent * shapshots directly "shared" by multiple transactions. That is, keep the * snapshots in a separate data structure (list?), track the number of * transactions using it (refcount). And reuse it when refcount gets to 0 * (after all the transactions either close or get a new snapshot). Not * sure it's worth the additional complexity, though. * *------------------------------------------------------------------------- */ #include "gtm/assert.h" #include "gtm/elog.h" #include "gtm/gtm.h" #include "gtm/gtm_client.h" #include "gtm/gtm_standby.h" #include "gtm/stringinfo.h" #include "gtm/libpq.h" #include "gtm/libpq-int.h" #include "gtm/pqformat.h" /* * GTM_GetTransactionSnapshot * Compute and store snapshot(s) for specified transactions. * * Get snapshot for the given transactions. If this is the first call in the * transaction, a fresh snapshot is taken and returned back. For a serializable * transaction, repeated calls to the function will return the same snapshot. * For a read-committed transaction, fresh snapshot is taken every time and * returned to the caller. * * The returned snapshot includes xmin (lowest still-running xact ID), * xmax (highest completed xact ID + 1), and a list of running xact IDs * in the range xmin <= xid < xmax. It is used as follows: * All xact IDs < xmin are considered finished. * All xact IDs >= xmax are considered still running. * For an xact ID xmin <= xid < xmax, consult list to see whether * it is considered running or not. * This ensures that the set of transactions seen as "running" by the * current xact will not change after it takes the snapshot. * * All running top-level XIDs are included in the snapshot. * * We also update the following global variables: * RecentGlobalXmin: the global xmin (oldest TransactionXmin across all * running transactions * * Note: this function should probably not be called with an argument that's * not statically allocated (see xip allocation below). */ static GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *status) { GlobalTransactionId xmin; GlobalTransactionId xmax; GlobalTransactionId globalxmin; int count = 0; gtm_ListCell *elem = NULL; int ii; /* * Instead of allocating memory for a snapshot, we use the snapshot of the * first transaction in the given array. The same snapshot will later be * copied to other transaction info structures. */ GTM_TransactionInfo *mygtm_txninfo = NULL; GTM_Snapshot snapshot = NULL; memset(status, 0, sizeof (int) * txn_count); for (ii = 0; ii < txn_count; ii++) { /* * Even if the request does not contain a valid GXID, we still send * down a snapshot, but mark the status field acoordingly */ if (handle[ii] != InvalidTransactionHandle) mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); else status[ii] = STATUS_NOT_FOUND; /* * If the transaction does not exist, just mark the status field with * a STATUS_ERROR code * * FIXME This comment seems to be misplaced/stale - we're not checking * if a transaction exists (we've already done that above and set the * status to STATUS_NOT_FOUND). */ if ((mygtm_txninfo != NULL) && (snapshot == NULL)) snapshot = &mygtm_txninfo->gti_current_snapshot; } /* * If no valid transaction exists in the array, we record the snapshot in a * thread-specific structure. This allows us to avoid repeated * allocation/freeing of the structure. * * Note that we must use a thread-specific variable and not a global * variable because a concurrent thread might compute a new snapshot and * overwrite the snapshot information while we are still sending this copy * to the client. Using a thread-specific storage avoids that problem. */ if (snapshot == NULL) snapshot = &GetMyThreadInfo->thr_snapshot; Assert(snapshot != NULL); /* * This can only happen when using a snapshot from GTMTransactions, as the * thread-specific sn_xip array is allocated statically as part of GTM_ThreadInfo. */ if (snapshot->sn_xip == NULL) { /* * First call for this snapshot */ snapshot->sn_xip = (GlobalTransactionId *) palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (snapshot->sn_xip == NULL) ereport(ERROR, (ENOMEM, errmsg("out of memory"))); } /* * It is sufficient to get shared lock on ProcArrayLock, even if we are * going to set MyProc->xmin. */ GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_READ); /* xmax is always latestCompletedXid + 1 */ xmax = GTMTransactions.gt_latestCompletedXid; Assert(GlobalTransactionIdIsNormal(xmax)); GlobalTransactionIdAdvance(xmax); /* initialize xmin calculation with xmax */ globalxmin = xmin = xmax; /* * Spin over transaction list checking xid, xmin, and subxids. The goal is to * gather all active xids and find the lowest xmin */ gtm_foreach(elem, GTMTransactions.gt_open_transactions) { volatile GTM_TransactionInfo *gtm_txninfo = (GTM_TransactionInfo *)gtm_lfirst(elem); GlobalTransactionId xid; /* Don't take into account LAZY VACUUMs */ if (gtm_txninfo->gti_vacuum) continue; /* Update globalxmin to be the smallest valid xmin */ xid = gtm_txninfo->gti_xmin; /* fetch just once */ if (GlobalTransactionIdIsNormal(xid) && GlobalTransactionIdPrecedes(xid, globalxmin)) globalxmin = xid; /* Fetch xid just once - see GetNewTransactionId */ xid = gtm_txninfo->gti_gxid; /* * If the transaction has been assigned an xid < xmax we add it to the * snapshot, and update xmin if necessary. There's no need to store * XIDs >= xmax, since we'll treat them as running anyway. We don't * bother to examine their subxids either. * * We don't include our own XID (if any) in the snapshot, but we must * include it into xmin. */ if (GlobalTransactionIdIsNormal(xid)) { /* * Unlike Postgres, we include the GXID of the current transaction * as well in the snapshot. This is necessary because the same * snapshot is shared by multiple backends through GTM proxy and * the GXID will vary for each backend. * * XXX We should confirm that this does not have any adverse effect * on the MVCC visibility and check if any changes are related to * the MVCC checks because of the change */ if (GlobalTransactionIdFollowsOrEquals(xid, xmax)) continue; if (GlobalTransactionIdPrecedes(xid, xmin)) xmin = xid; snapshot->sn_xip[count++] = xid; } } /* * Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. */ if (GlobalTransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; snapshot->sn_xmin = xmin; snapshot->sn_xmax = xmax; snapshot->sn_xcnt = count; /* * Now, before the proc array lock is released, set the xmin in the txninfo * structures of all the transactions. */ for (ii = 0; ii < txn_count; ii++) { GTM_Snapshot mysnap = NULL; /* * We have already gone through all the transaction handles above and * marked the invalid handles with STATUS_ERROR */ if ((status[ii] == STATUS_ERROR) || (status[ii] == STATUS_NOT_FOUND)) continue; mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); mysnap = &mygtm_txninfo->gti_current_snapshot; if (GTM_IsTransSerializable(mygtm_txninfo)) { if ((mygtm_txninfo->gti_snapshot_set) && (txn_count > 1)) { GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); elog(ERROR, "Grouped snapshot can only include first snapshot in Serializable transaction"); } if (!mygtm_txninfo->gti_snapshot_set) { /* * For the first transaction in the array, the snapshot is * already set. */ if (snapshot != mysnap) { if (mysnap->sn_xip == NULL) { /* * First call for this snapshot */ mysnap->sn_xip = (GlobalTransactionId *) palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (mysnap->sn_xip == NULL) ereport(ERROR, (ENOMEM, errmsg("out of memory"))); } mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } mygtm_txninfo->gti_snapshot_set = true; } } else if (snapshot != mysnap) { if (mysnap->sn_xip == NULL) { /* * First call for this snapshot */ mysnap->sn_xip = (GlobalTransactionId *) palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (mysnap->sn_xip == NULL) { GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); ereport(ERROR, (ENOMEM, errmsg("out of memory"))); } } mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } if ((mygtm_txninfo != NULL) && (!GlobalTransactionIdIsValid(mygtm_txninfo->gti_xmin))) mygtm_txninfo->gti_xmin = xmin; } GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)", snapshot->sn_xmin, snapshot->sn_xmax, snapshot->sn_xcnt); return snapshot; } /* * Process MSG_SNAPSHOT_GET command */ void ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid) { StringInfoData buf; GTM_TransactionHandle txn; GlobalTransactionId gxid; GTM_Snapshot snapshot; MemoryContext oldContext; int status; int txn_count; const char *data = NULL; txn_count = pq_getmsgint(message, sizeof (int)); Assert(txn_count == 1); data = pq_getmsgbytes(message, sizeof (gxid)); if (data == NULL) ereport(ERROR, (EPROTO, errmsg("Message does not contain valid GXID"))); memcpy(&gxid, data, sizeof(gxid)); elog(INFO, "Received transaction ID %d for snapshot obtention", gxid); txn = GTM_GXIDToHandle(gxid); pq_getmsgend(message); if (get_gxid) { gxid = GTM_GetGlobalTransactionId(txn); if (gxid == InvalidGlobalTransactionId) ereport(ERROR, (EINVAL, errmsg("Failed to get a new transaction id"))); } oldContext = MemoryContextSwitchTo(TopMostMemoryContext); /* * Get a fresh snapshot */ if ((snapshot = GTM_GetTransactionSnapshot(&txn, 1, &status)) == NULL) ereport(ERROR, (EINVAL, errmsg("Failed to get a snapshot"))); MemoryContextSwitchTo(oldContext); pq_beginmessage(&buf, 'S'); pq_sendint(&buf, get_gxid ? SNAPSHOT_GXID_GET_RESULT : SNAPSHOT_GET_RESULT, 4); if (myport->remote_type == GTM_NODE_GTM_PROXY) { GTM_ProxyMsgHeader proxyhdr; proxyhdr.ph_conid = myport->conn_id; pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * snapshot->sn_xcnt); pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); return; } /* * Process MSG_SNAPSHOT_GET_MULTI command */ void ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) { StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTM_Snapshot snapshot; MemoryContext oldContext; int txn_count; int ii; int status[GTM_MAX_GLOBAL_TRANSACTIONS]; txn_count = pq_getmsgint(message, sizeof (int)); for (ii = 0; ii < txn_count; ii++) { const char *data = pq_getmsgbytes(message, sizeof (gxid[ii])); if (data == NULL) ereport(ERROR, (EPROTO, errmsg("Message does not contain valid GXID"))); memcpy(&gxid[ii], data, sizeof (gxid[ii])); txn[ii] = GTM_GXIDToHandle(gxid[ii]); } pq_getmsgend(message); oldContext = MemoryContextSwitchTo(TopMostMemoryContext); /* * Get a fresh snapshot */ if ((snapshot = GTM_GetTransactionSnapshot(txn, txn_count, status)) == NULL) ereport(ERROR, (EINVAL, errmsg("Failed to get a snapshot"))); MemoryContextSwitchTo(oldContext); pq_beginmessage(&buf, 'S'); pq_sendint(&buf, SNAPSHOT_GET_MULTI_RESULT, 4); if (myport->remote_type == GTM_NODE_GTM_PROXY) { GTM_ProxyMsgHeader proxyhdr; proxyhdr.ph_conid = myport->conn_id; pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * snapshot->sn_xcnt); pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); #if 0 /* Do not need this portion because this command does not change * internal status. */ if (GetMyThreadInfo->thr_conn->standby) { int _rc; int txn_count_out; int status_out[GTM_MAX_GLOBAL_TRANSACTIONS]; GlobalTransactionId xmin_out; GlobalTransactionId xmax_out; GlobalTransactionId recent_global_xmin_out; int32 xcnt_out; GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; int count = 0; retry: elog(DEBUG1, "calling snapshot_get_multi() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); _rc = snapshot_get_multi(GetMyThreadInfo->thr_conn->standby, txn_count, gxid, &txn_count_out, status_out, &xmin_out, &xmax_out, &recent_global_xmin_out, &xcnt_out); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; elog(DEBUG1, "snapshot_get_multi() rc=%d done.", _rc); } #endif return; }