summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Khandekar2012-02-01 10:19:18 +0000
committerAmit Khandekar2012-02-01 10:19:18 +0000
commit5b00dd716cd6b47db2faee71dabc0fde91005ca6 (patch)
treee6c151f034938b170bf8fee6c1daa118f01ca45a
parent5c0b161a0ae728c8c8f62721165b4bcae05d5893 (diff)
Make the advisory locks cluster-aware, so that a user connected from
coordinator 1 will wait for an advisory lock held by a user connected from coordinator 2. This support is enabled only for session level advisory locks in this commit, Transaction level advisory locks are not included.
-rw-r--r--src/backend/storage/lmgr/lock.c50
-rw-r--r--src/backend/utils/adt/dbsize.c34
-rw-r--r--src/backend/utils/adt/lockfuncs.c199
-rw-r--r--src/include/pgxc/pgxcnode.h2
-rw-r--r--src/include/storage/lock.h1
5 files changed, 273 insertions, 13 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index e3ad3199c4..dc4f996092 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -173,6 +173,10 @@ static HTAB *LockMethodLocalHash;
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;
+static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag,
+LOCKMODE lockmode, bool sessionLock, bool dontWait, bool reportMemoryError,
+bool only_increment);
+
#ifdef LOCK_DEBUG
@@ -470,6 +474,28 @@ LockAcquire(const LOCKTAG *locktag,
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
}
+#ifdef PGXC
+/*
+ * LockIncrementIfExists - Special purpose case of LockAcquire().
+ * This checks if there is already a reference to the lock. If yes,
+ * increments it, and returns true. If not, just returns back false.
+ * Effectively, it never creates a new lock.
+ */
+bool
+LockIncrementIfExists(const LOCKTAG *locktag,
+ LOCKMODE lockmode)
+{
+ int ret;
+
+ ret = LockAcquireExtendedXC(locktag, lockmode,
+ true, /* always session level lock */
+ true, /* never wait */
+ true, true);
+
+ return (ret == LOCKACQUIRE_ALREADY_HELD);
+}
+#endif
+
/*
* LockAcquireExtended - allows us to specify additional options
*
@@ -486,6 +512,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
bool dontWait,
bool reportMemoryError)
{
+ return LockAcquireExtendedXC(locktag, lockmode, sessionLock, dontWait,
+ reportMemoryError, false);
+}
+
+/*
+ * LockAcquireExtendedXC - additional parameter only_increment. This is XC
+ * specific. Check comments for the function LockIncrementIfExists()
+ */
+static LockAcquireResult
+LockAcquireExtendedXC(const LOCKTAG *locktag,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait,
+ bool reportMemoryError,
+ bool only_increment)
+{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
LOCALLOCKTAG localtag;
@@ -580,7 +622,13 @@ LockAcquireExtended(const LOCKTAG *locktag,
GrantLockLocal(locallock, owner);
return LOCKACQUIRE_ALREADY_HELD;
}
-
+#ifdef PGXC
+ else if (only_increment)
+ {
+ /* User does not want to create new lock if it does not already exist */
+ return LOCKACQUIRE_NOT_AVAIL;
+ }
+#endif
/*
* Emit a WAL record if acquisition of this lock needs to be replayed in a
* standby server. Only AccessExclusiveLocks can conflict with lock types
diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c
index c326c4ac7a..9997f4b8f7 100644
--- a/src/backend/utils/adt/dbsize.c
+++ b/src/backend/utils/adt/dbsize.c
@@ -37,8 +37,7 @@
#include "utils/syscache.h"
#ifdef PGXC
-static int64 pgxc_database_size(Oid dbOid);
-static int64 pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query);
+static Datum pgxc_database_size(Oid dbOid);
static int64 pgxc_exec_sizefunc(Oid relOid, char *funcname, char *extra_arg);
/*
@@ -165,7 +164,7 @@ pg_database_size_oid(PG_FUNCTION_ARGS)
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
- PG_RETURN_INT64(pgxc_database_size(dbOid));
+ PG_RETURN_DATUM(pgxc_database_size(dbOid));
#endif
PG_RETURN_INT64(calculate_database_size(dbOid));
@@ -179,7 +178,7 @@ pg_database_size_name(PG_FUNCTION_ARGS)
#ifdef PGXC
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
- PG_RETURN_INT64(pgxc_database_size(dbOid));
+ PG_RETURN_DATUM(pgxc_database_size(dbOid));
#endif
PG_RETURN_INT64(calculate_database_size(dbOid));
@@ -720,7 +719,7 @@ pg_relation_filepath(PG_FUNCTION_ARGS)
* pgxc_database_size
* Given a dboid, return sum of pg_database_size() executed on all the datanodes
*/
-static int64
+static Datum
pgxc_database_size(Oid dbOid)
{
StringInfoData buf;
@@ -744,11 +743,13 @@ pgxc_database_size(Oid dbOid)
/*
* pgxc_execute_on_nodes
- * Execute 'query' on all the nodes in 'nodelist', and return
- * sum of all the results.
- * 'query' *must* be of the form: 'select pg_***_size()'
+ * Execute 'query' on all the nodes in 'nodelist', and returns int64 datum
+ * which has the sum of all the results. If multiples nodes are involved, it
+ * assumes that the query returns exactly one row with one attribute of type
+ * int64. If there is a single node, it just returns the datum as-is without
+ * checking the type of the returned value.
*/
-static int64
+Datum
pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query)
{
StringInfoData buf;
@@ -759,6 +760,7 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query)
int64 size = 0;
bool isnull;
char *nodename;
+ Datum datum;
/*
* Connect to SPI manager
@@ -793,14 +795,22 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query)
*/
Assert(SPI_processed == 1 && spi_tupdesc->natts == 1);
- size = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], spi_tupdesc, 1, &isnull));
+ datum = SPI_getbinval(SPI_tuptable->vals[0], spi_tupdesc, 1, &isnull);
+ /* For single node, don't assume the type of datum. It can be bool also. */
+ if (numnodes == 1)
+ break;
+
+ size = DatumGetInt64(datum);
total_size += size;
}
SPI_finish();
- return total_size;
+ if (numnodes == 1)
+ PG_RETURN_DATUM(datum);
+ else
+ PG_RETURN_INT64(total_size);
}
@@ -845,7 +855,7 @@ pgxc_exec_sizefunc(Oid relOid, char *funcname, char *extra_arg)
relation_close(rel, AccessShareLock);
- return pgxc_execute_on_nodes(numnodes, nodelist, buf.data);
+ return DatumGetInt64(pgxc_execute_on_nodes(numnodes, nodelist, buf.data));
}
#endif /* PGXC */
diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c
index 6d7d4f4fb0..f39e932a04 100644
--- a/src/backend/utils/adt/lockfuncs.c
+++ b/src/backend/utils/adt/lockfuncs.c
@@ -15,6 +15,11 @@
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#include "pgxc/pgxcnode.h"
+#include "pgxc/nodemgr.h"
+#endif
#include "storage/predicate_internals.h"
#include "storage/proc.h"
#include "utils/builtins.h"
@@ -49,6 +54,13 @@ typedef struct
int predLockIdx; /* current index for pred lock */
} PG_Lock_Status;
+#ifdef PGXC
+static bool
+pgxc_advisory_lock(int64 key64, int32 key1, int32 key2, bool iskeybig,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait);
+#endif
/*
* VXIDGetDatum - Construct a text representation of a VXID
@@ -408,6 +420,141 @@ pg_lock_status(PG_FUNCTION_ARGS)
#define SET_LOCKTAG_INT32(tag, key1, key2) \
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)
+#ifdef PGXC
+
+#define MAXINT8LEN 25
+
+/*
+ * pgxc_advisory_lock - Core function that implements the algorithm needed to
+ * propogate the advisory lock function calls to all coordinators.
+ * The idea is to make the advisory locks cluster-aware, so that a user having
+ * a lock from coordinator 1 will make the user from coordinator 2 to wait for
+ * the same lock.
+ *
+ * Return true if all locks are returned successfully. False otherwise.
+ * Effectively this function returns false only if dontWait is true. Otherwise
+ * it either returns true, or waits on a resource, or throws an exception
+ * returned by the lock function calls in case of unexpected or fatal errors.
+ *
+ * Currently used only for session level locks; not used for transaction level
+ * locks.
+ */
+static bool
+pgxc_advisory_lock(int64 key64, int32 key1, int32 key2, bool iskeybig,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait)
+{
+ LOCKTAG locktag;
+ Oid *coOids, *dnOids;
+ int numdnodes, numcoords;
+ StringInfoData lock_cmd, unlock_cmd, lock_funcname, unlock_funcname, args;
+ char str_key[MAXINT8LEN + 1];
+ int i, prev;
+ bool abort_locking = false;
+ Datum lock_status;
+
+ if (iskeybig)
+ SET_LOCKTAG_INT64(locktag, key64);
+ else
+ SET_LOCKTAG_INT32(locktag, key1, key2);
+
+ PgxcNodeListAndCount(&coOids, &dnOids, &numcoords, &numdnodes);
+
+ /* Skip everything XC specific if there's only one coordinator running */
+ if (numcoords <= 1)
+ {
+ (void) LockAcquire(&locktag, lockmode, sessionLock, dontWait);
+ return true;
+ }
+
+ /*
+ * If there is already a lock held by us, just increment and return; we
+ * already did all necessary steps when we locked for the first time.
+ */
+ if (LockIncrementIfExists(&locktag, lockmode) == true)
+ return true;
+
+ initStringInfo(&lock_funcname);
+ appendStringInfo(&lock_funcname, "pg_%sadvisory_%slock%s",
+ (dontWait ? "try_" : ""),
+ (sessionLock ? "" : "xact_"),
+ (lockmode == ShareLock ? "_shared": ""));
+
+ initStringInfo(&unlock_funcname);
+ appendStringInfo(&unlock_funcname, "pg_advisory_unlock%s",
+ (lockmode == ShareLock ? "_shared": ""));
+
+ initStringInfo(&args);
+
+ if (iskeybig)
+ {
+ pg_lltoa(key64, str_key);
+ appendStringInfo(&args, "%s", str_key);
+ }
+ else
+ {
+ pg_ltoa(key1, str_key);
+ appendStringInfo(&args, "%s, ", str_key);
+ pg_ltoa(key2, str_key);
+ appendStringInfo(&args, "%s", str_key);
+ }
+
+ initStringInfo(&lock_cmd);
+ appendStringInfo(&lock_cmd, "SELECT pg_catalog.%s(%s)", lock_funcname.data, args.data);
+ initStringInfo(&unlock_cmd);
+ appendStringInfo(&unlock_cmd, "SELECT pg_catalog.%s(%s)", unlock_funcname.data, args.data);
+
+ /*
+ * Go on locking on each coordinator. Keep on unlocking the previous one
+ * after a lock is held on next coordinator. Don't unlock the local
+ * coordinator. After finishing all coordinators, ultimately only the local
+ * coordinator would be locked, but still we will have scanned all
+ * coordinators to make sure no one else has already grabbed the lock. The
+ * reason for unlocking all remote locks is because the session level locks
+ * don't get unlocked until explicitly unlocked or the session quits. After
+ * the user session quits without explicitly unlocking, the coord-to-coord
+ * pooler connection stays and so does the remote coordinator lock.
+ */
+ prev = -1;
+ for (i = 0; i <= numcoords && !abort_locking; i++, prev++)
+ {
+ if (i < numcoords)
+ {
+ /* If this coordinator is myself, execute native lock calls */
+ if (i == PGXCNodeId - 1)
+ lock_status = LockAcquire(&locktag, lockmode, sessionLock, dontWait);
+ else
+ lock_status = pgxc_execute_on_nodes(1, &coOids[i], lock_cmd.data);
+
+ if (dontWait == true && DatumGetBool(lock_status) == false)
+ {
+ abort_locking = true;
+ /*
+ * If we have gone past the local coordinator node, it implies
+ * that we have obtained a local lock. But now that we are
+ * aborting, we need to release the local lock first.
+ */
+ if (i > PGXCNodeId - 1)
+ (void) LockRelease(&locktag, lockmode, sessionLock);
+ }
+ }
+
+ /*
+ * Unlock the previous lock, but only if it is a remote coordinator. If
+ * it is a local one, we want to keep that lock. Remember, the final
+ * status should be that there is only *one* lock held, and that is the
+ * local lock.
+ */
+ if (prev >= 0 && prev != PGXCNodeId - 1)
+ pgxc_execute_on_nodes(1, &coOids[prev], unlock_cmd.data);
+ }
+
+ return (!abort_locking);
+}
+
+#endif /* PGXC */
+
/*
* pg_advisory_lock(int8) - acquire exclusive lock on an int8 key
*/
@@ -417,6 +564,14 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ pgxc_advisory_lock(key, 0, 0, true, ExclusiveLock, true, false);
+ PG_RETURN_VOID();
+ }
+#endif
+
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
@@ -450,6 +605,14 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ pgxc_advisory_lock(key, 0, 0, true, ShareLock, true, false);
+ PG_RETURN_VOID();
+ }
+#endif
+
SET_LOCKTAG_INT64(tag, key);
(void) LockAcquire(&tag, ShareLock, true, false);
@@ -486,6 +649,11 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PG_RETURN_BOOL(pgxc_advisory_lock(key, 0, 0, true, ExclusiveLock, true, true));
+#endif
+
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, true, true);
@@ -525,6 +693,11 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PG_RETURN_BOOL(pgxc_advisory_lock(key, 0, 0, true, ShareLock, true, true));
+#endif
+
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ShareLock, true, true);
@@ -600,6 +773,14 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ pgxc_advisory_lock(0, key1, key2, false, ExclusiveLock, true, false);
+ PG_RETURN_VOID();
+ }
+#endif
+
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ExclusiveLock, true, false);
@@ -635,6 +816,14 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
int32 key2 = PG_GETARG_INT32(1);
LOCKTAG tag;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ pgxc_advisory_lock(0, key1, key2, false, ShareLock, true, false);
+ PG_RETURN_VOID();
+ }
+#endif
+
SET_LOCKTAG_INT32(tag, key1, key2);
(void) LockAcquire(&tag, ShareLock, true, false);
@@ -673,6 +862,11 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PG_RETURN_BOOL(pgxc_advisory_lock(0, key1, key2, false, ExclusiveLock, true, true));
+#endif
+
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ExclusiveLock, true, true);
@@ -714,6 +908,11 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS)
LOCKTAG tag;
LockAcquireResult res;
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ PG_RETURN_BOOL(pgxc_advisory_lock(0, key1, key2, false, ShareLock, true, true));
+#endif
+
SET_LOCKTAG_INT32(tag, key1, key2);
res = LockAcquire(&tag, ShareLock, true, true);
diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h
index c51bc01693..743c9ea5be 100644
--- a/src/include/pgxc/pgxcnode.h
+++ b/src/include/pgxc/pgxcnode.h
@@ -164,4 +164,6 @@ extern char get_message(PGXCNodeHandle *conn, int *len, char **msg);
extern void add_error_message(PGXCNodeHandle * handle, const char *message);
+extern Datum pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query);
+
#endif
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 7ec961f443..17afbc15f3 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -477,6 +477,7 @@ extern LockAcquireResult LockAcquire(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait);
+extern bool LockIncrementIfExists(const LOCKTAG *locktag, LOCKMODE lockmode);
extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,