summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c10
-rw-r--r--src/backend/access/transam/xlog.c7
-rw-r--r--src/backend/parser/gram.y13
-rw-r--r--src/backend/storage/ipc/procarray.c97
-rw-r--r--src/backend/storage/lmgr/predicate.c76
-rw-r--r--src/backend/utils/misc/guc.c20
-rw-r--r--src/backend/utils/time/snapmgr.c605
-rw-r--r--src/bin/initdb/initdb.c1
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.h3
-rw-r--r--src/include/parser/kwlist.h1
-rw-r--r--src/include/storage/predicate.h2
-rw-r--r--src/include/storage/procarray.h8
-rw-r--r--src/include/utils/snapmgr.h5
14 files changed, 832 insertions, 18 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3dab45c2da6..c151d3be191 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2067,6 +2067,16 @@ PrepareTransaction(void)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
+ /*
+ * Likewise, don't allow PREPARE after pg_export_snapshot. This could be
+ * supported if we added cleanup logic to twophase.c, but for now it
+ * doesn't seem worth the trouble.
+ */
+ if (XactHasExportedSnapshots())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot PREPARE a transaction that has exported snapshots")));
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1c17348472e..5fec88691a3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -58,6 +58,7 @@
#include "utils/guc.h"
#include "utils/ps_status.h"
#include "utils/relmapper.h"
+#include "utils/snapmgr.h"
#include "utils/timestamp.h"
#include "pg_trace.h"
@@ -6382,6 +6383,12 @@ StartupXLOG(void)
ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);
/*
+ * Likewise, delete any saved transaction snapshot files that got
+ * left behind by crashed backends.
+ */
+ DeleteAllExportedSnapshotFiles();
+
+ /*
* Initialize for Hot Standby, if enabled. We won't let backends in
* yet, not until we've reached the min recovery point specified in
* control file and we've established a recovery snapshot from a
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index e9f3896badb..e2edcde024e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -553,8 +553,8 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE
- SHOW SIMILAR SIMPLE SMALLINT SOME STABLE STANDALONE_P START STATEMENT
- STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
+ SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
+ STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
SYMMETRIC SYSID SYSTEM_P
TABLE TABLES TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN TIME TIMESTAMP
@@ -1352,6 +1352,15 @@ set_rest: /* Generic SET syntaxes: */
n->args = list_make1(makeStringConst($3 == XMLOPTION_DOCUMENT ? "DOCUMENT" : "CONTENT", @3));
$$ = n;
}
+ /* Special syntaxes invented by PostgreSQL: */
+ | TRANSACTION SNAPSHOT Sconst
+ {
+ VariableSetStmt *n = makeNode(VariableSetStmt);
+ n->kind = VAR_SET_MULTI;
+ n->name = "TRANSACTION SNAPSHOT";
+ n->args = list_make1(makeStringConst($3, @3));
+ $$ = n;
+ }
;
var_name: ColId { $$ = $1; }
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 7d44a34d025..a8ff54037c4 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1123,6 +1123,28 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
}
/*
+ * GetMaxSnapshotXidCount -- get max size for snapshot XID array
+ *
+ * We have to export this for use by snapmgr.c.
+ */
+int
+GetMaxSnapshotXidCount(void)
+{
+ return procArray->maxProcs;
+}
+
+/*
+ * GetMaxSnapshotSubxidCount -- get max size for snapshot sub-XID array
+ *
+ * We have to export this for use by snapmgr.c.
+ */
+int
+GetMaxSnapshotSubxidCount(void)
+{
+ return TOTAL_MAX_CACHED_SUBXIDS;
+}
+
+/*
* GetSnapshotData -- returns information about running transactions.
*
* The returned snapshot includes xmin (lowest still-running xact ID),
@@ -1187,14 +1209,14 @@ GetSnapshotData(Snapshot snapshot)
* we are in recovery, see later comments.
*/
snapshot->xip = (TransactionId *)
- malloc(arrayP->maxProcs * sizeof(TransactionId));
+ malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)
- malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -1377,6 +1399,77 @@ GetSnapshotData(Snapshot snapshot)
}
/*
+ * ProcArrayInstallImportedXmin -- install imported xmin into MyProc->xmin
+ *
+ * This is called when installing a snapshot imported from another
+ * transaction. To ensure that OldestXmin doesn't go backwards, we must
+ * check that the source transaction is still running, and we'd better do
+ * that atomically with installing the new xmin.
+ *
+ * Returns TRUE if successful, FALSE if source xact is no longer running.
+ */
+bool
+ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
+{
+ bool result = false;
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+
+ Assert(TransactionIdIsNormal(xmin));
+ if (!TransactionIdIsNormal(sourcexid))
+ return false;
+
+ /* Get lock so source xact can't end while we're doing this */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ volatile PGPROC *proc = arrayP->procs[index];
+ TransactionId xid;
+
+ /* Ignore procs running LAZY VACUUM */
+ if (proc->vacuumFlags & PROC_IN_VACUUM)
+ continue;
+
+ xid = proc->xid; /* fetch just once */
+ if (xid != sourcexid)
+ continue;
+
+ /*
+ * We check the transaction's database ID for paranoia's sake: if
+ * it's in another DB then its xmin does not cover us. Caller should
+ * have detected this already, so we just treat any funny cases as
+ * "transaction not found".
+ */
+ if (proc->databaseId != MyDatabaseId)
+ continue;
+
+ /*
+ * Likewise, let's just make real sure its xmin does cover us.
+ */
+ xid = proc->xmin; /* fetch just once */
+ if (!TransactionIdIsNormal(xid) ||
+ !TransactionIdPrecedesOrEquals(xid, xmin))
+ continue;
+
+ /*
+ * We're good. Install the new xmin. As in GetSnapshotData, set
+ * TransactionXmin too. (Note that because snapmgr.c called
+ * GetSnapshotData first, we'll be overwriting a valid xmin here,
+ * so we don't check that.)
+ */
+ MyProc->xmin = TransactionXmin = xmin;
+
+ result = true;
+ break;
+ }
+
+ LWLockRelease(ProcArrayLock);
+
+ return result;
+}
+
+/*
* GetRunningTransactionData -- returns information about running transactions.
*
* Similar to GetSnapshotData but returns more information. We include
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index d39f8975f88..345f6f56a69 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -147,6 +147,8 @@
*
* predicate lock maintenance
* GetSerializableTransactionSnapshot(Snapshot snapshot)
+ * SetSerializableTransactionSnapshot(Snapshot snapshot,
+ * TransactionId sourcexid)
* RegisterPredicateLockingXid(void)
* PredicateLockRelation(Relation relation, Snapshot snapshot)
* PredicateLockPage(Relation relation, BlockNumber blkno,
@@ -417,7 +419,8 @@ static void OldSerXidSetActiveSerXmin(TransactionId xid);
static uint32 predicatelock_hash(const void *key, Size keysize);
static void SummarizeOldestCommittedSxact(void);
static Snapshot GetSafeSnapshot(Snapshot snapshot);
-static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot);
+static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
+ TransactionId sourcexid);
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
PREDICATELOCKTARGETTAG *parent);
@@ -1505,7 +1508,8 @@ GetSafeSnapshot(Snapshot origSnapshot)
* our caller passed to us. The pointer returned is actually the same
* one passed to it, but we avoid assuming that here.
*/
- snapshot = GetSerializableTransactionSnapshotInt(origSnapshot);
+ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
+ InvalidTransactionId);
if (MySerializableXact == InvalidSerializableXact)
return snapshot; /* no concurrent r/w xacts; it's safe */
@@ -1574,11 +1578,52 @@ GetSerializableTransactionSnapshot(Snapshot snapshot)
if (XactReadOnly && XactDeferrable)
return GetSafeSnapshot(snapshot);
- return GetSerializableTransactionSnapshotInt(snapshot);
+ return GetSerializableTransactionSnapshotInt(snapshot,
+ InvalidTransactionId);
}
+/*
+ * Import a snapshot to be used for the current transaction.
+ *
+ * This is nearly the same as GetSerializableTransactionSnapshot, except that
+ * we don't take a new snapshot, but rather use the data we're handed.
+ *
+ * The caller must have verified that the snapshot came from a serializable
+ * transaction; and if we're read-write, the source transaction must not be
+ * read-only.
+ */
+void
+SetSerializableTransactionSnapshot(Snapshot snapshot,
+ TransactionId sourcexid)
+{
+ Assert(IsolationIsSerializable());
+
+ /*
+ * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
+ * import snapshots, since there's no way to wait for a safe snapshot
+ * when we're using the snap we're told to. (XXX instead of throwing
+ * an error, we could just ignore the XactDeferrable flag?)
+ */
+ if (XactReadOnly && XactDeferrable)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
+
+ (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
+}
+
+/*
+ * Guts of GetSerializableTransactionSnapshot
+ *
+ * If sourcexid is valid, this is actually an import operation and we should
+ * skip calling GetSnapshotData, because the snapshot contents are already
+ * loaded up. HOWEVER: to avoid race conditions, we must check that the
+ * source xact is still running after we acquire SerializableXactHashLock.
+ * We do that by calling ProcArrayInstallImportedXmin.
+ */
static Snapshot
-GetSerializableTransactionSnapshotInt(Snapshot snapshot)
+GetSerializableTransactionSnapshotInt(Snapshot snapshot,
+ TransactionId sourcexid)
{
PGPROC *proc;
VirtualTransactionId vxid;
@@ -1598,6 +1643,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot)
/*
* First we get the sxact structure, which may involve looping and access
* to the "finished" list to free a structure for use.
+ *
+ * We must hold SerializableXactHashLock when taking/checking the snapshot
+ * to avoid race conditions, for much the same reasons that
+ * GetSnapshotData takes the ProcArrayLock. Since we might have to release
+ * SerializableXactHashLock to call SummarizeOldestCommittedSxact, this
+ * means we have to create the sxact first, which is a bit annoying (in
+ * particular, an elog(ERROR) in procarray.c would cause us to leak the
+ * sxact). Consider refactoring to avoid this.
*/
#ifdef TEST_OLDSERXID
SummarizeOldestCommittedSxact();
@@ -1615,8 +1668,19 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot)
}
} while (!sxact);
- /* Get the snapshot */
- snapshot = GetSnapshotData(snapshot);
+ /* Get the snapshot, or check that it's safe to use */
+ if (!TransactionIdIsValid(sourcexid))
+ snapshot = GetSnapshotData(snapshot);
+ else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
+ {
+ ReleasePredXact(sxact);
+ LWLockRelease(SerializableXactHashLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source transaction %u is not running anymore.",
+ sourcexid)));
+ }
/*
* If there are no serializable transactions which are not read-only, we
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f1d35a9a112..73e60010711 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -72,6 +72,7 @@
#include "utils/plancache.h"
#include "utils/portal.h"
#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
#include "utils/tzparser.h"
#include "utils/xml.h"
@@ -6093,8 +6094,11 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
case VAR_SET_MULTI:
/*
- * Special case for special SQL syntax that effectively sets more
- * than one variable per statement.
+ * Special-case SQL syntaxes. The TRANSACTION and SESSION
+ * CHARACTERISTICS cases effectively set more than one variable
+ * per statement. TRANSACTION SNAPSHOT only takes one argument,
+ * but we put it here anyway since it's a special case and not
+ * related to any GUC variable.
*/
if (strcmp(stmt->name, "TRANSACTION") == 0)
{
@@ -6140,6 +6144,18 @@ ExecSetVariableStmt(VariableSetStmt *stmt)
item->defname);
}
}
+ else if (strcmp(stmt->name, "TRANSACTION SNAPSHOT") == 0)
+ {
+ A_Const *con = (A_Const *) linitial(stmt->args);
+
+ if (stmt->is_local)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("SET LOCAL TRANSACTION SNAPSHOT is not implemented")));
+ Assert(IsA(con, A_Const));
+ Assert(nodeTag(&con->val) == T_String);
+ ImportSnapshot(strVal(&con->val));
+ }
else
elog(ERROR, "unexpected SET MULTI element: %s",
stmt->name);
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 518aaf1af0c..50fb78057d8 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -15,12 +15,16 @@
* handle this reference as an internally-tracked registration, so that this
* module is entirely lower-level than ResourceOwners.
*
+ * Likewise, any snapshots that have been exported by pg_export_snapshot
+ * have regd_count = 1 and are counted in RegisteredSnapshots, but are not
+ * tracked by any resource owner.
+ *
* These arrangements let us reset MyProc->xmin when there are no snapshots
* referenced by this transaction. (One possible improvement would be to be
* able to advance Xmin when the snapshot with the earliest Xmin is no longer
* referenced. That's a bit harder though, it requires more locking, and
- * anyway it should be rather uncommon to keep snapshots referenced for too
- * long.)
+ * anyway it should be rather uncommon to keep temporary snapshots referenced
+ * for too long.)
*
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
@@ -33,12 +37,16 @@
*/
#include "postgres.h"
+#include <sys/stat.h>
+#include <unistd.h>
+
#include "access/transam.h"
#include "access/xact.h"
+#include "miscadmin.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
-#include "utils/memutils.h"
+#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
@@ -111,6 +119,15 @@ bool FirstSnapshotSet = false;
*/
static Snapshot FirstXactSnapshot = NULL;
+/* Define pathname of exported-snapshot files */
+#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
+#define XactExportFilePath(path, xid, num, suffix) \
+ snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \
+ xid, num, suffix)
+
+/* Current xact's exported snapshots (a list of Snapshot structs) */
+static List *exportedSnapshots = NIL;
+
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
@@ -139,7 +156,8 @@ GetTransactionSnapshot(void)
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
* make a copy of it rather than returning CurrentSnapshotData
- * directly.
+ * directly. Furthermore, if we're running in serializable mode,
+ * predicate.c needs to wrap the snapshot fetch in its own processing.
*/
if (IsolationUsesXactSnapshot())
{
@@ -204,6 +222,88 @@ SnapshotSetCommandId(CommandId curcid)
}
/*
+ * SetTransactionSnapshot
+ * Set the transaction's snapshot from an imported MVCC snapshot.
+ *
+ * Note that this is very closely tied to GetTransactionSnapshot --- it
+ * must take care of all the same considerations as the first-snapshot case
+ * in GetTransactionSnapshot.
+ */
+static void
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+{
+ /* Caller should have checked this already */
+ Assert(!FirstSnapshotSet);
+
+ Assert(RegisteredSnapshots == 0);
+ Assert(FirstXactSnapshot == NULL);
+
+ /*
+ * Even though we are not going to use the snapshot it computes, we must
+ * call GetSnapshotData, for two reasons: (1) to be sure that
+ * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
+ * RecentXmin and RecentGlobalXmin. (We could alternatively include those
+ * two variables in exported snapshot files, but it seems better to have
+ * snapshot importers compute reasonably up-to-date values for them.)
+ */
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+ /*
+ * Now copy appropriate fields from the source snapshot.
+ */
+ CurrentSnapshot->xmin = sourcesnap->xmin;
+ CurrentSnapshot->xmax = sourcesnap->xmax;
+ CurrentSnapshot->xcnt = sourcesnap->xcnt;
+ Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
+ memcpy(CurrentSnapshot->xip, sourcesnap->xip,
+ sourcesnap->xcnt * sizeof(TransactionId));
+ CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
+ Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
+ memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
+ sourcesnap->subxcnt * sizeof(TransactionId));
+ CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
+ CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
+ /* NB: curcid should NOT be copied, it's a local matter */
+
+ /*
+ * Now we have to fix what GetSnapshotData did with MyProc->xmin and
+ * TransactionXmin. There is a race condition: to make sure we are not
+ * causing the global xmin to go backwards, we have to test that the
+ * source transaction is still running, and that has to be done atomically.
+ * So let procarray.c do it.
+ *
+ * Note: in serializable mode, predicate.c will do this a second time.
+ * It doesn't seem worth contorting the logic here to avoid two calls,
+ * especially since it's not clear that predicate.c *must* do this.
+ */
+ if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source transaction %u is not running anymore.",
+ sourcexid)));
+
+ /*
+ * In transaction-snapshot mode, the first snapshot must live until end of
+ * xact, so we must make a copy of it. Furthermore, if we're running in
+ * serializable mode, predicate.c needs to do its own processing.
+ */
+ if (IsolationUsesXactSnapshot())
+ {
+ if (IsolationIsSerializable())
+ SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid);
+ /* Make a saved copy */
+ CurrentSnapshot = CopySnapshot(CurrentSnapshot);
+ FirstXactSnapshot = CurrentSnapshot;
+ /* Mark it as "registered" in FirstXactSnapshot */
+ FirstXactSnapshot->regd_count++;
+ RegisteredSnapshots++;
+ }
+
+ FirstSnapshotSet = true;
+}
+
+/*
* CopySnapshot
* Copy the given snapshot.
*
@@ -558,6 +658,42 @@ AtEOXact_Snapshot(bool isCommit)
}
FirstXactSnapshot = NULL;
+ /*
+ * If we exported any snapshots, clean them up.
+ */
+ if (exportedSnapshots != NIL)
+ {
+ TransactionId myxid = GetTopTransactionId();
+ int i;
+ char buf[MAXPGPATH];
+
+ /*
+ * Get rid of the files. Unlink failure is only a WARNING because
+ * (1) it's too late to abort the transaction, and (2) leaving a
+ * leaked file around has little real consequence anyway.
+ */
+ for (i = 1; i <= list_length(exportedSnapshots); i++)
+ {
+ XactExportFilePath(buf, myxid, i, "");
+ if (unlink(buf))
+ elog(WARNING, "could not unlink file \"%s\": %m", buf);
+ }
+
+ /*
+ * As with the FirstXactSnapshot, we needn't spend any effort on
+ * cleaning up the per-snapshot data structures, but we do need to
+ * adjust the RegisteredSnapshots count to prevent a warning below.
+ *
+ * Note: you might be thinking "why do we have the exportedSnapshots
+ * list at all? All we need is a counter!". You're right, but we do
+ * it this way in case we ever feel like improving xmin management.
+ */
+ Assert(RegisteredSnapshots >= list_length(exportedSnapshots));
+ RegisteredSnapshots -= list_length(exportedSnapshots);
+
+ exportedSnapshots = NIL;
+ }
+
/* On commit, complain about leftover snapshots */
if (isCommit)
{
@@ -586,3 +722,464 @@ AtEOXact_Snapshot(bool isCommit)
SnapshotResetXmin();
}
+
+
+/*
+ * ExportSnapshot
+ * Export the snapshot to a file so that other backends can import it.
+ * Returns the token (the file name) that can be used to import this
+ * snapshot.
+ */
+static char *
+ExportSnapshot(Snapshot snapshot)
+{
+ TransactionId topXid;
+ TransactionId *children;
+ int nchildren;
+ int addTopXid;
+ StringInfoData buf;
+ FILE *f;
+ int i;
+ MemoryContext oldcxt;
+ char path[MAXPGPATH];
+ char pathtmp[MAXPGPATH];
+
+ /*
+ * It's tempting to call RequireTransactionChain here, since it's not
+ * very useful to export a snapshot that will disappear immediately
+ * afterwards. However, we haven't got enough information to do that,
+ * since we don't know if we're at top level or not. For example, we
+ * could be inside a plpgsql function that is going to fire off other
+ * transactions via dblink. Rather than disallow perfectly legitimate
+ * usages, don't make a check.
+ *
+ * Also note that we don't make any restriction on the transaction's
+ * isolation level; however, importers must check the level if they
+ * are serializable.
+ */
+
+ /*
+ * This will assign a transaction ID if we do not yet have one.
+ */
+ topXid = GetTopTransactionId();
+
+ /*
+ * We cannot export a snapshot from a subtransaction because there's no
+ * easy way for importers to verify that the same subtransaction is still
+ * running.
+ */
+ if (IsSubTransaction())
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("cannot export a snapshot from a subtransaction")));
+
+ /*
+ * We do however allow previous committed subtransactions to exist.
+ * Importers of the snapshot must see them as still running, so get their
+ * XIDs to add them to the snapshot.
+ */
+ nchildren = xactGetCommittedChildren(&children);
+
+ /*
+ * Copy the snapshot into TopTransactionContext, add it to the
+ * exportedSnapshots list, and mark it pseudo-registered. We do this to
+ * ensure that the snapshot's xmin is honored for the rest of the
+ * transaction. (Right now, because SnapshotResetXmin is so stupid, this
+ * is overkill; but later we might make that routine smarter.)
+ */
+ snapshot = CopySnapshot(snapshot);
+
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+ exportedSnapshots = lappend(exportedSnapshots, snapshot);
+ MemoryContextSwitchTo(oldcxt);
+
+ snapshot->regd_count++;
+ RegisteredSnapshots++;
+
+ /*
+ * Fill buf with a text serialization of the snapshot, plus identification
+ * data about this transaction. The format expected by ImportSnapshot
+ * is pretty rigid: each line must be fieldname:value.
+ */
+ initStringInfo(&buf);
+
+ appendStringInfo(&buf, "xid:%u\n", topXid);
+ appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
+ appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
+ appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
+
+ appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
+ appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
+
+ /*
+ * We must include our own top transaction ID in the top-xid data, since
+ * by definition we will still be running when the importing transaction
+ * adopts the snapshot, but GetSnapshotData never includes our own XID in
+ * the snapshot. (There must, therefore, be enough room to add it.)
+ *
+ * However, it could be that our topXid is after the xmax, in which case
+ * we shouldn't include it because xip[] members are expected to be before
+ * xmax. (We need not make the same check for subxip[] members, see
+ * snapshot.h.)
+ */
+ addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0;
+ appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
+ for (i = 0; i < snapshot->xcnt; i++)
+ appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
+ if (addTopXid)
+ appendStringInfo(&buf, "xip:%u\n", topXid);
+
+ /*
+ * Similarly, we add our subcommitted child XIDs to the subxid data.
+ * Here, we have to cope with possible overflow.
+ */
+ if (snapshot->suboverflowed ||
+ snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
+ appendStringInfoString(&buf, "sof:1\n");
+ else
+ {
+ appendStringInfoString(&buf, "sof:0\n");
+ appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
+ for (i = 0; i < snapshot->subxcnt; i++)
+ appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
+ for (i = 0; i < nchildren; i++)
+ appendStringInfo(&buf, "sxp:%u\n", children[i]);
+ }
+ appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
+
+ /*
+ * Now write the text representation into a file. We first write to a
+ * ".tmp" filename, and rename to final filename if no error. This
+ * ensures that no other backend can read an incomplete file
+ * (ImportSnapshot won't allow it because of its valid-characters check).
+ */
+ XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp");
+ if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", pathtmp)));
+
+ if (fwrite(buf.data, buf.len, 1, f) != 1)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+ /* no fsync() since file need not survive a system crash */
+
+ if (FreeFile(f))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+ /*
+ * Now that we have written everything into a .tmp file, rename the file
+ * to remove the .tmp suffix.
+ */
+ XactExportFilePath(path, topXid, list_length(exportedSnapshots), "");
+
+ if (rename(pathtmp, path) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ pathtmp, path)));
+
+ /*
+ * The basename of the file is what we return from pg_export_snapshot().
+ * It's already in path in a textual format and we know that the path
+ * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
+ * and pstrdup it so as not to return the address of a local variable.
+ */
+ return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
+}
+
+/*
+ * pg_export_snapshot
+ * SQL-callable wrapper for ExportSnapshot.
+ */
+Datum
+pg_export_snapshot(PG_FUNCTION_ARGS)
+{
+ char *snapshotName;
+
+ snapshotName = ExportSnapshot(GetActiveSnapshot());
+ PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
+}
+
+
+/*
+ * Parsing subroutines for ImportSnapshot: parse a line with the given
+ * prefix followed by a value, and advance *s to the next line. The
+ * filename is provided for use in error messages.
+ */
+static int
+parseIntFromText(const char *prefix, char **s, const char *filename)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+ int val;
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%d", &val) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+ return val;
+}
+
+static TransactionId
+parseXidFromText(const char *prefix, char **s, const char *filename)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+ TransactionId val;
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%u", &val) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+ return val;
+}
+
+/*
+ * ImportSnapshot
+ * Import a previously exported snapshot. The argument should be a
+ * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
+ * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
+ */
+void
+ImportSnapshot(const char *idstr)
+{
+ char path[MAXPGPATH];
+ FILE *f;
+ struct stat stat_buf;
+ char *filebuf;
+ int xcnt;
+ int i;
+ TransactionId src_xid;
+ Oid src_dbid;
+ int src_isolevel;
+ bool src_readonly;
+ SnapshotData snapshot;
+
+ /*
+ * Must be at top level of a fresh transaction. Note in particular that
+ * we check we haven't acquired an XID --- if we have, it's conceivable
+ * that the snapshot would show it as not running, making for very
+ * screwy behavior.
+ */
+ if (FirstSnapshotSet ||
+ GetTopTransactionIdIfAny() != InvalidTransactionId ||
+ IsSubTransaction())
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
+
+ /*
+ * If we are in read committed mode then the next query would execute
+ * with a new snapshot thus making this function call quite useless.
+ */
+ if (!IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
+
+ /*
+ * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
+ * this mainly to prevent reading arbitrary files.
+ */
+ if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid snapshot identifier \"%s\"", idstr)));
+
+ /* OK, read the file */
+ snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
+
+ f = AllocateFile(path, PG_BINARY_R);
+ if (!f)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid snapshot identifier \"%s\"", idstr)));
+
+ /* get the size of the file so that we know how much memory we need */
+ if (fstat(fileno(f), &stat_buf))
+ elog(ERROR, "could not stat file \"%s\": %m", path);
+
+ /* and read the file into a palloc'd string */
+ filebuf = (char *) palloc(stat_buf.st_size + 1);
+ if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
+ elog(ERROR, "could not read file \"%s\": %m", path);
+
+ filebuf[stat_buf.st_size] = '\0';
+
+ FreeFile(f);
+
+ /*
+ * Construct a snapshot struct by parsing the file content.
+ */
+ memset(&snapshot, 0, sizeof(snapshot));
+
+ src_xid = parseXidFromText("xid:", &filebuf, path);
+ /* we abuse parseXidFromText a bit here ... */
+ src_dbid = parseXidFromText("dbid:", &filebuf, path);
+ src_isolevel = parseIntFromText("iso:", &filebuf, path);
+ src_readonly = parseIntFromText("ro:", &filebuf, path);
+
+ snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
+ snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
+
+ snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
+
+ /* sanity-check the xid count before palloc */
+ if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+ for (i = 0; i < xcnt; i++)
+ snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
+
+ snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
+
+ if (!snapshot.suboverflowed)
+ {
+ snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
+
+ /* sanity-check the xid count before palloc */
+ if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+ for (i = 0; i < xcnt; i++)
+ snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
+ }
+ else
+ {
+ snapshot.subxcnt = 0;
+ snapshot.subxip = NULL;
+ }
+
+ snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
+
+ /*
+ * Do some additional sanity checking, just to protect ourselves. We
+ * don't trouble to check the array elements, just the most critical
+ * fields.
+ */
+ if (!TransactionIdIsNormal(src_xid) ||
+ !OidIsValid(src_dbid) ||
+ !TransactionIdIsNormal(snapshot.xmin) ||
+ !TransactionIdIsNormal(snapshot.xmax))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ /*
+ * If we're serializable, the source transaction must be too, otherwise
+ * predicate.c has problems (SxactGlobalXmin could go backwards). Also,
+ * a non-read-only transaction can't adopt a snapshot from a read-only
+ * transaction, as predicate.c handles the cases very differently.
+ */
+ if (IsolationIsSerializable())
+ {
+ if (src_isolevel != XACT_SERIALIZABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
+ if (src_readonly && !XactReadOnly)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
+ }
+
+ /*
+ * We cannot import a snapshot that was taken in a different database,
+ * because vacuum calculates OldestXmin on a per-database basis; so the
+ * source transaction's xmin doesn't protect us from data loss. This
+ * restriction could be removed if the source transaction were to mark
+ * its xmin as being globally applicable. But that would require some
+ * additional syntax, since that has to be known when the snapshot is
+ * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
+ */
+ if (src_dbid != MyDatabaseId)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot import a snapshot from a different database")));
+
+ /* OK, install the snapshot */
+ SetTransactionSnapshot(&snapshot, src_xid);
+}
+
+/*
+ * XactHasExportedSnapshots
+ * Test whether current transaction has exported any snapshots.
+ */
+bool
+XactHasExportedSnapshots(void)
+{
+ return (exportedSnapshots != NIL);
+}
+
+/*
+ * DeleteAllExportedSnapshotFiles
+ * Clean up any files that have been left behind by a crashed backend
+ * that had exported snapshots before it died.
+ *
+ * This should be called during database startup or crash recovery.
+ */
+void
+DeleteAllExportedSnapshotFiles(void)
+{
+ char buf[MAXPGPATH];
+ DIR *s_dir;
+ struct dirent *s_de;
+
+ if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR)))
+ {
+ /*
+ * We really should have that directory in a sane cluster setup. But
+ * then again if we don't, it's not fatal enough to make it FATAL.
+ * Since we're running in the postmaster, LOG is our best bet.
+ */
+ elog(LOG, "could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR);
+ return;
+ }
+
+ while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL)
+ {
+ if (strcmp(s_de->d_name, ".") == 0 ||
+ strcmp(s_de->d_name, "..") == 0)
+ continue;
+
+ snprintf(buf, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
+ /* Again, unlink failure is not worthy of FATAL */
+ if (unlink(buf))
+ elog(LOG, "could not unlink file \"%s\": %m", buf);
+ }
+
+ FreeDir(s_dir);
+}
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index e535fdad1e9..29000095cba 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -2555,6 +2555,7 @@ main(int argc, char *argv[])
"pg_clog",
"pg_notify",
"pg_serial",
+ "pg_snapshots",
"pg_subtrans",
"pg_twophase",
"pg_multixact/members",
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 8097545faaa..c6273c12671 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201110161
+#define CATALOG_VERSION_NO 201110221
#endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 96f43fe0b1f..64b7a6a314d 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2870,6 +2870,9 @@ DESCR("xlog filename and byte offset, given an xlog location");
DATA(insert OID = 2851 ( pg_xlogfile_name PGNSP PGUID 12 1 0 0 0 f f f t f i 1 0 25 "25" _null_ _null_ _null_ _null_ pg_xlogfile_name _null_ _null_ _null_ ));
DESCR("xlog filename, given an xlog location");
+DATA(insert OID = 3809 ( pg_export_snapshot PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_export_snapshot _null_ _null_ _null_ ));
+DESCR("export a snapshot");
+
DATA(insert OID = 3810 ( pg_is_in_recovery PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ ));
DESCR("true if server is in recovery");
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 12c2faf3de8..3d170bc3679 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -337,6 +337,7 @@ PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD)
PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD)
PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD)
PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD)
+PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD)
PG_KEYWORD("some", SOME, RESERVED_KEYWORD)
PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD)
PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 9603b10ad40..7f9d5fc51c1 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -43,6 +43,8 @@ extern bool PageIsPredicateLocked(Relation relation, BlockNumber blkno);
/* predicate lock maintenance */
extern Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot);
+extern void SetSerializableTransactionSnapshot(Snapshot snapshot,
+ TransactionId sourcexid);
extern void RegisterPredicateLockingXid(TransactionId xid);
extern void PredicateLockRelation(Relation relation, Snapshot snapshot);
extern void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index a11d4385b7d..71c82437cdf 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -37,10 +37,16 @@ extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
extern void ExpireAllKnownAssignedTransactionIds(void);
extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
-extern RunningTransactions GetRunningTransactionData(void);
+extern int GetMaxSnapshotXidCount(void);
+extern int GetMaxSnapshotSubxidCount(void);
extern Snapshot GetSnapshotData(Snapshot snapshot);
+extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
+ TransactionId sourcexid);
+
+extern RunningTransactions GetRunningTransactionData(void);
+
extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum);
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index e665a28aff8..1e5cb866b17 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -42,4 +42,9 @@ extern void AtSubCommit_Snapshot(int level);
extern void AtSubAbort_Snapshot(int level);
extern void AtEOXact_Snapshot(bool isCommit);
+extern Datum pg_export_snapshot(PG_FUNCTION_ARGS);
+extern void ImportSnapshot(const char *idstr);
+extern bool XactHasExportedSnapshots(void);
+extern void DeleteAllExportedSnapshotFiles(void);
+
#endif /* SNAPMGR_H */