diff options
| author | Amit Khandekar | 2012-02-01 10:19:18 +0000 |
|---|---|---|
| committer | Amit Khandekar | 2012-02-01 10:19:18 +0000 |
| commit | 5b00dd716cd6b47db2faee71dabc0fde91005ca6 (patch) | |
| tree | e6c151f034938b170bf8fee6c1daa118f01ca45a | |
| parent | 5c0b161a0ae728c8c8f62721165b4bcae05d5893 (diff) | |
Make the advisory locks cluster-aware, so that a user connected from
coordinator 1 will wait for an advisory lock held by a user connected
from coordinator 2.
This support is enabled only for session level advisory locks in this commit,
Transaction level advisory locks are not included.
| -rw-r--r-- | src/backend/storage/lmgr/lock.c | 50 | ||||
| -rw-r--r-- | src/backend/utils/adt/dbsize.c | 34 | ||||
| -rw-r--r-- | src/backend/utils/adt/lockfuncs.c | 199 | ||||
| -rw-r--r-- | src/include/pgxc/pgxcnode.h | 2 | ||||
| -rw-r--r-- | src/include/storage/lock.h | 1 |
5 files changed, 273 insertions, 13 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index e3ad3199c4..dc4f996092 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -173,6 +173,10 @@ static HTAB *LockMethodLocalHash; static LOCALLOCK *awaitedLock; static ResourceOwner awaitedOwner; +static LockAcquireResult LockAcquireExtendedXC(const LOCKTAG *locktag, +LOCKMODE lockmode, bool sessionLock, bool dontWait, bool reportMemoryError, +bool only_increment); + #ifdef LOCK_DEBUG @@ -470,6 +474,28 @@ LockAcquire(const LOCKTAG *locktag, return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true); } +#ifdef PGXC +/* + * LockIncrementIfExists - Special purpose case of LockAcquire(). + * This checks if there is already a reference to the lock. If yes, + * increments it, and returns true. If not, just returns back false. + * Effectively, it never creates a new lock. + */ +bool +LockIncrementIfExists(const LOCKTAG *locktag, + LOCKMODE lockmode) +{ + int ret; + + ret = LockAcquireExtendedXC(locktag, lockmode, + true, /* always session level lock */ + true, /* never wait */ + true, true); + + return (ret == LOCKACQUIRE_ALREADY_HELD); +} +#endif + /* * LockAcquireExtended - allows us to specify additional options * @@ -486,6 +512,22 @@ LockAcquireExtended(const LOCKTAG *locktag, bool dontWait, bool reportMemoryError) { + return LockAcquireExtendedXC(locktag, lockmode, sessionLock, dontWait, + reportMemoryError, false); +} + +/* + * LockAcquireExtendedXC - additional parameter only_increment. This is XC + * specific. Check comments for the function LockIncrementIfExists() + */ +static LockAcquireResult +LockAcquireExtendedXC(const LOCKTAG *locktag, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait, + bool reportMemoryError, + bool only_increment) +{ LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; @@ -580,7 +622,13 @@ LockAcquireExtended(const LOCKTAG *locktag, GrantLockLocal(locallock, owner); return LOCKACQUIRE_ALREADY_HELD; } - +#ifdef PGXC + else if (only_increment) + { + /* User does not want to create new lock if it does not already exist */ + return LOCKACQUIRE_NOT_AVAIL; + } +#endif /* * Emit a WAL record if acquisition of this lock needs to be replayed in a * standby server. Only AccessExclusiveLocks can conflict with lock types diff --git a/src/backend/utils/adt/dbsize.c b/src/backend/utils/adt/dbsize.c index c326c4ac7a..9997f4b8f7 100644 --- a/src/backend/utils/adt/dbsize.c +++ b/src/backend/utils/adt/dbsize.c @@ -37,8 +37,7 @@ #include "utils/syscache.h" #ifdef PGXC -static int64 pgxc_database_size(Oid dbOid); -static int64 pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query); +static Datum pgxc_database_size(Oid dbOid); static int64 pgxc_exec_sizefunc(Oid relOid, char *funcname, char *extra_arg); /* @@ -165,7 +164,7 @@ pg_database_size_oid(PG_FUNCTION_ARGS) #ifdef PGXC if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) - PG_RETURN_INT64(pgxc_database_size(dbOid)); + PG_RETURN_DATUM(pgxc_database_size(dbOid)); #endif PG_RETURN_INT64(calculate_database_size(dbOid)); @@ -179,7 +178,7 @@ pg_database_size_name(PG_FUNCTION_ARGS) #ifdef PGXC if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) - PG_RETURN_INT64(pgxc_database_size(dbOid)); + PG_RETURN_DATUM(pgxc_database_size(dbOid)); #endif PG_RETURN_INT64(calculate_database_size(dbOid)); @@ -720,7 +719,7 @@ pg_relation_filepath(PG_FUNCTION_ARGS) * pgxc_database_size * Given a dboid, return sum of pg_database_size() executed on all the datanodes */ -static int64 +static Datum pgxc_database_size(Oid dbOid) { StringInfoData buf; @@ -744,11 +743,13 @@ pgxc_database_size(Oid dbOid) /* * pgxc_execute_on_nodes - * Execute 'query' on all the nodes in 'nodelist', and return - * sum of all the results. - * 'query' *must* be of the form: 'select pg_***_size()' + * Execute 'query' on all the nodes in 'nodelist', and returns int64 datum + * which has the sum of all the results. If multiples nodes are involved, it + * assumes that the query returns exactly one row with one attribute of type + * int64. If there is a single node, it just returns the datum as-is without + * checking the type of the returned value. */ -static int64 +Datum pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query) { StringInfoData buf; @@ -759,6 +760,7 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query) int64 size = 0; bool isnull; char *nodename; + Datum datum; /* * Connect to SPI manager @@ -793,14 +795,22 @@ pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query) */ Assert(SPI_processed == 1 && spi_tupdesc->natts == 1); - size = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], spi_tupdesc, 1, &isnull)); + datum = SPI_getbinval(SPI_tuptable->vals[0], spi_tupdesc, 1, &isnull); + /* For single node, don't assume the type of datum. It can be bool also. */ + if (numnodes == 1) + break; + + size = DatumGetInt64(datum); total_size += size; } SPI_finish(); - return total_size; + if (numnodes == 1) + PG_RETURN_DATUM(datum); + else + PG_RETURN_INT64(total_size); } @@ -845,7 +855,7 @@ pgxc_exec_sizefunc(Oid relOid, char *funcname, char *extra_arg) relation_close(rel, AccessShareLock); - return pgxc_execute_on_nodes(numnodes, nodelist, buf.data); + return DatumGetInt64(pgxc_execute_on_nodes(numnodes, nodelist, buf.data)); } #endif /* PGXC */ diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index 6d7d4f4fb0..f39e932a04 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -15,6 +15,11 @@ #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#include "pgxc/pgxcnode.h" +#include "pgxc/nodemgr.h" +#endif #include "storage/predicate_internals.h" #include "storage/proc.h" #include "utils/builtins.h" @@ -49,6 +54,13 @@ typedef struct int predLockIdx; /* current index for pred lock */ } PG_Lock_Status; +#ifdef PGXC +static bool +pgxc_advisory_lock(int64 key64, int32 key1, int32 key2, bool iskeybig, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait); +#endif /* * VXIDGetDatum - Construct a text representation of a VXID @@ -408,6 +420,141 @@ pg_lock_status(PG_FUNCTION_ARGS) #define SET_LOCKTAG_INT32(tag, key1, key2) \ SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2) +#ifdef PGXC + +#define MAXINT8LEN 25 + +/* + * pgxc_advisory_lock - Core function that implements the algorithm needed to + * propogate the advisory lock function calls to all coordinators. + * The idea is to make the advisory locks cluster-aware, so that a user having + * a lock from coordinator 1 will make the user from coordinator 2 to wait for + * the same lock. + * + * Return true if all locks are returned successfully. False otherwise. + * Effectively this function returns false only if dontWait is true. Otherwise + * it either returns true, or waits on a resource, or throws an exception + * returned by the lock function calls in case of unexpected or fatal errors. + * + * Currently used only for session level locks; not used for transaction level + * locks. + */ +static bool +pgxc_advisory_lock(int64 key64, int32 key1, int32 key2, bool iskeybig, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait) +{ + LOCKTAG locktag; + Oid *coOids, *dnOids; + int numdnodes, numcoords; + StringInfoData lock_cmd, unlock_cmd, lock_funcname, unlock_funcname, args; + char str_key[MAXINT8LEN + 1]; + int i, prev; + bool abort_locking = false; + Datum lock_status; + + if (iskeybig) + SET_LOCKTAG_INT64(locktag, key64); + else + SET_LOCKTAG_INT32(locktag, key1, key2); + + PgxcNodeListAndCount(&coOids, &dnOids, &numcoords, &numdnodes); + + /* Skip everything XC specific if there's only one coordinator running */ + if (numcoords <= 1) + { + (void) LockAcquire(&locktag, lockmode, sessionLock, dontWait); + return true; + } + + /* + * If there is already a lock held by us, just increment and return; we + * already did all necessary steps when we locked for the first time. + */ + if (LockIncrementIfExists(&locktag, lockmode) == true) + return true; + + initStringInfo(&lock_funcname); + appendStringInfo(&lock_funcname, "pg_%sadvisory_%slock%s", + (dontWait ? "try_" : ""), + (sessionLock ? "" : "xact_"), + (lockmode == ShareLock ? "_shared": "")); + + initStringInfo(&unlock_funcname); + appendStringInfo(&unlock_funcname, "pg_advisory_unlock%s", + (lockmode == ShareLock ? "_shared": "")); + + initStringInfo(&args); + + if (iskeybig) + { + pg_lltoa(key64, str_key); + appendStringInfo(&args, "%s", str_key); + } + else + { + pg_ltoa(key1, str_key); + appendStringInfo(&args, "%s, ", str_key); + pg_ltoa(key2, str_key); + appendStringInfo(&args, "%s", str_key); + } + + initStringInfo(&lock_cmd); + appendStringInfo(&lock_cmd, "SELECT pg_catalog.%s(%s)", lock_funcname.data, args.data); + initStringInfo(&unlock_cmd); + appendStringInfo(&unlock_cmd, "SELECT pg_catalog.%s(%s)", unlock_funcname.data, args.data); + + /* + * Go on locking on each coordinator. Keep on unlocking the previous one + * after a lock is held on next coordinator. Don't unlock the local + * coordinator. After finishing all coordinators, ultimately only the local + * coordinator would be locked, but still we will have scanned all + * coordinators to make sure no one else has already grabbed the lock. The + * reason for unlocking all remote locks is because the session level locks + * don't get unlocked until explicitly unlocked or the session quits. After + * the user session quits without explicitly unlocking, the coord-to-coord + * pooler connection stays and so does the remote coordinator lock. + */ + prev = -1; + for (i = 0; i <= numcoords && !abort_locking; i++, prev++) + { + if (i < numcoords) + { + /* If this coordinator is myself, execute native lock calls */ + if (i == PGXCNodeId - 1) + lock_status = LockAcquire(&locktag, lockmode, sessionLock, dontWait); + else + lock_status = pgxc_execute_on_nodes(1, &coOids[i], lock_cmd.data); + + if (dontWait == true && DatumGetBool(lock_status) == false) + { + abort_locking = true; + /* + * If we have gone past the local coordinator node, it implies + * that we have obtained a local lock. But now that we are + * aborting, we need to release the local lock first. + */ + if (i > PGXCNodeId - 1) + (void) LockRelease(&locktag, lockmode, sessionLock); + } + } + + /* + * Unlock the previous lock, but only if it is a remote coordinator. If + * it is a local one, we want to keep that lock. Remember, the final + * status should be that there is only *one* lock held, and that is the + * local lock. + */ + if (prev >= 0 && prev != PGXCNodeId - 1) + pgxc_execute_on_nodes(1, &coOids[prev], unlock_cmd.data); + } + + return (!abort_locking); +} + +#endif /* PGXC */ + /* * pg_advisory_lock(int8) - acquire exclusive lock on an int8 key */ @@ -417,6 +564,14 @@ pg_advisory_lock_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + pgxc_advisory_lock(key, 0, 0, true, ExclusiveLock, true, false); + PG_RETURN_VOID(); + } +#endif + SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -450,6 +605,14 @@ pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS) int64 key = PG_GETARG_INT64(0); LOCKTAG tag; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + pgxc_advisory_lock(key, 0, 0, true, ShareLock, true, false); + PG_RETURN_VOID(); + } +#endif + SET_LOCKTAG_INT64(tag, key); (void) LockAcquire(&tag, ShareLock, true, false); @@ -486,6 +649,11 @@ pg_try_advisory_lock_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PG_RETURN_BOOL(pgxc_advisory_lock(key, 0, 0, true, ExclusiveLock, true, true)); +#endif + SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -525,6 +693,11 @@ pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PG_RETURN_BOOL(pgxc_advisory_lock(key, 0, 0, true, ShareLock, true, true)); +#endif + SET_LOCKTAG_INT64(tag, key); res = LockAcquire(&tag, ShareLock, true, true); @@ -600,6 +773,14 @@ pg_advisory_lock_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + pgxc_advisory_lock(0, key1, key2, false, ExclusiveLock, true, false); + PG_RETURN_VOID(); + } +#endif + SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ExclusiveLock, true, false); @@ -635,6 +816,14 @@ pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS) int32 key2 = PG_GETARG_INT32(1); LOCKTAG tag; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + pgxc_advisory_lock(0, key1, key2, false, ShareLock, true, false); + PG_RETURN_VOID(); + } +#endif + SET_LOCKTAG_INT32(tag, key1, key2); (void) LockAcquire(&tag, ShareLock, true, false); @@ -673,6 +862,11 @@ pg_try_advisory_lock_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PG_RETURN_BOOL(pgxc_advisory_lock(0, key1, key2, false, ExclusiveLock, true, true)); +#endif + SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ExclusiveLock, true, true); @@ -714,6 +908,11 @@ pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS) LOCKTAG tag; LockAcquireResult res; +#ifdef PGXC + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + PG_RETURN_BOOL(pgxc_advisory_lock(0, key1, key2, false, ShareLock, true, true)); +#endif + SET_LOCKTAG_INT32(tag, key1, key2); res = LockAcquire(&tag, ShareLock, true, true); diff --git a/src/include/pgxc/pgxcnode.h b/src/include/pgxc/pgxcnode.h index c51bc01693..743c9ea5be 100644 --- a/src/include/pgxc/pgxcnode.h +++ b/src/include/pgxc/pgxcnode.h @@ -164,4 +164,6 @@ extern char get_message(PGXCNodeHandle *conn, int *len, char **msg); extern void add_error_message(PGXCNodeHandle * handle, const char *message); +extern Datum pgxc_execute_on_nodes(int numnodes, Oid *nodelist, char *query); + #endif diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 7ec961f443..17afbc15f3 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -477,6 +477,7 @@ extern LockAcquireResult LockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait); +extern bool LockIncrementIfExists(const LOCKTAG *locktag, LOCKMODE lockmode); extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, |
