Proper object locking for GRANT/REVOKE
authorPeter Eisentraut <peter@eisentraut.org>
Fri, 15 Nov 2024 09:53:54 +0000 (10:53 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Fri, 15 Nov 2024 10:03:48 +0000 (11:03 +0100)
Refactor objectNamesToOids() to use get_object_address() internally if
possible.  Not only does this save a lot of code, it also allows us to
use the object locking provided by get_object_address() for
GRANT/REVOKE.  There was previously a code comment that complained
about the lack of locking in objectNamesToOids(), which is now fixed.

The check in ExecGrant_Type_check() is obsolete because
get_object_address_type() already does the same check.

Reviewed-by: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/bf72b82c-124d-4efa-a484-bb928e9494e4@eisentraut.org

src/backend/catalog/aclchk.c
src/test/isolation/expected/intra-grant-inplace.out

index 95eb0b12277b11655cefecc24a928cd9f1a43c95..aaf96a965e42af821ee945bc355f7a4fe0e58c3a 100644 (file)
@@ -659,147 +659,77 @@ ExecGrantStmt_oids(InternalGrant *istmt)
  * objectNamesToOids
  *
  * Turn a list of object names of a given type into an Oid list.
- *
- * XXX: This function doesn't take any sort of locks on the objects whose
- * names it looks up.  In the face of concurrent DDL, we might easily latch
- * onto an old version of an object, causing the GRANT or REVOKE statement
- * to fail.
  */
 static List *
 objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
 {
        List       *objects = NIL;
        ListCell   *cell;
+       const LOCKMODE lockmode = AccessShareLock;
 
        Assert(objnames != NIL);
 
        switch (objtype)
        {
-               case OBJECT_TABLE:
-               case OBJECT_SEQUENCE:
-                       foreach(cell, objnames)
-                       {
-                               RangeVar   *relvar = (RangeVar *) lfirst(cell);
-                               Oid                     relOid;
-
-                               relOid = RangeVarGetRelid(relvar, NoLock, false);
-                               objects = lappend_oid(objects, relOid);
-                       }
-                       break;
-               case OBJECT_DATABASE:
-                       foreach(cell, objnames)
-                       {
-                               char       *dbname = strVal(lfirst(cell));
-                               Oid                     dbid;
-
-                               dbid = get_database_oid(dbname, false);
-                               objects = lappend_oid(objects, dbid);
-                       }
-                       break;
-               case OBJECT_DOMAIN:
-               case OBJECT_TYPE:
-                       foreach(cell, objnames)
-                       {
-                               List       *typname = (List *) lfirst(cell);
-                               Oid                     oid;
-
-                               oid = typenameTypeId(NULL, makeTypeNameFromNameList(typname));
-                               objects = lappend_oid(objects, oid);
-                       }
-                       break;
-               case OBJECT_FUNCTION:
-                       foreach(cell, objnames)
-                       {
-                               ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
-                               Oid                     funcid;
+               default:
 
-                               funcid = LookupFuncWithArgs(OBJECT_FUNCTION, func, false);
-                               objects = lappend_oid(objects, funcid);
-                       }
-                       break;
-               case OBJECT_LANGUAGE:
+                       /*
+                        * For most object types, we use get_object_address() directly.
+                        */
                        foreach(cell, objnames)
                        {
-                               char       *langname = strVal(lfirst(cell));
-                               Oid                     oid;
+                               ObjectAddress address;
 
-                               oid = get_language_oid(langname, false);
-                               objects = lappend_oid(objects, oid);
+                               address = get_object_address(objtype, lfirst(cell), NULL, lockmode, false);
+                               objects = lappend_oid(objects, address.objectId);
                        }
                        break;
-               case OBJECT_LARGEOBJECT:
-                       foreach(cell, objnames)
-                       {
-                               Oid                     lobjOid = oidparse(lfirst(cell));
 
-                               if (!LargeObjectExists(lobjOid))
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_UNDEFINED_OBJECT),
-                                                        errmsg("large object %u does not exist",
-                                                                       lobjOid)));
-
-                               objects = lappend_oid(objects, lobjOid);
-                       }
-                       break;
-               case OBJECT_SCHEMA:
-                       foreach(cell, objnames)
-                       {
-                               char       *nspname = strVal(lfirst(cell));
-                               Oid                     oid;
+               case OBJECT_TABLE:
+               case OBJECT_SEQUENCE:
 
-                               oid = get_namespace_oid(nspname, false);
-                               objects = lappend_oid(objects, oid);
-                       }
-                       break;
-               case OBJECT_PROCEDURE:
+                       /*
+                        * Here, we don't use get_object_address().  It requires that the
+                        * specified object type match the actual type of the object, but
+                        * in GRANT/REVOKE, all table-like things are addressed as TABLE.
+                        */
                        foreach(cell, objnames)
                        {
-                               ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
-                               Oid                     procid;
+                               RangeVar   *relvar = (RangeVar *) lfirst(cell);
+                               Oid                     relOid;
 
-                               procid = LookupFuncWithArgs(OBJECT_PROCEDURE, func, false);
-                               objects = lappend_oid(objects, procid);
+                               relOid = RangeVarGetRelid(relvar, lockmode, false);
+                               objects = lappend_oid(objects, relOid);
                        }
                        break;
-               case OBJECT_ROUTINE:
-                       foreach(cell, objnames)
-                       {
-                               ObjectWithArgs *func = (ObjectWithArgs *) lfirst(cell);
-                               Oid                     routid;
 
-                               routid = LookupFuncWithArgs(OBJECT_ROUTINE, func, false);
-                               objects = lappend_oid(objects, routid);
-                       }
-                       break;
-               case OBJECT_TABLESPACE:
-                       foreach(cell, objnames)
-                       {
-                               char       *spcname = strVal(lfirst(cell));
-                               Oid                     spcoid;
+               case OBJECT_DOMAIN:
+               case OBJECT_TYPE:
 
-                               spcoid = get_tablespace_oid(spcname, false);
-                               objects = lappend_oid(objects, spcoid);
-                       }
-                       break;
-               case OBJECT_FDW:
+                       /*
+                        * The parse representation of types and domains in privilege
+                        * targets is different from that expected by get_object_address()
+                        * (for parse conflict reasons), so we have to do a bit of
+                        * conversion here.
+                        */
                        foreach(cell, objnames)
                        {
-                               char       *fdwname = strVal(lfirst(cell));
-                               Oid                     fdwid = get_foreign_data_wrapper_oid(fdwname, false);
+                               List       *typname = (List *) lfirst(cell);
+                               TypeName   *tn = makeTypeNameFromNameList(typname);
+                               ObjectAddress address;
+                               Relation        relation;
 
-                               objects = lappend_oid(objects, fdwid);
+                               address = get_object_address(objtype, (Node *) tn, &relation, lockmode, false);
+                               Assert(relation == NULL);
+                               objects = lappend_oid(objects, address.objectId);
                        }
                        break;
-               case OBJECT_FOREIGN_SERVER:
-                       foreach(cell, objnames)
-                       {
-                               char       *srvname = strVal(lfirst(cell));
-                               Oid                     srvid = get_foreign_server_oid(srvname, false);
 
-                               objects = lappend_oid(objects, srvid);
-                       }
-                       break;
                case OBJECT_PARAMETER_ACL:
+
+                       /*
+                        * Parameters are handled completely differently.
+                        */
                        foreach(cell, objnames)
                        {
                                /*
@@ -830,9 +760,6 @@ objectNamesToOids(ObjectType objtype, List *objnames, bool is_grant)
                                        objects = lappend_oid(objects, parameterId);
                        }
                        break;
-               default:
-                       elog(ERROR, "unrecognized GrantStmt.objtype: %d",
-                                (int) objtype);
        }
 
        return objects;
@@ -2458,14 +2385,6 @@ ExecGrant_Type_check(InternalGrant *istmt, HeapTuple tuple)
                                (errcode(ERRCODE_INVALID_GRANT_OPERATION),
                                 errmsg("cannot set privileges of multirange types"),
                                 errhint("Set the privileges of the range type instead.")));
-
-       /* Used GRANT DOMAIN on a non-domain? */
-       if (istmt->objtype == OBJECT_DOMAIN &&
-               pg_type_tuple->typtype != TYPTYPE_DOMAIN)
-               ereport(ERROR,
-                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("\"%s\" is not a domain",
-                                               NameStr(pg_type_tuple->typname))));
 }
 
 static void
index 4e9695a02146c0a15adf1e94a6093e6fe9ab70c6..1aa9da622da05a0628a1ea2eff36b42168dfbe3f 100644 (file)
@@ -248,6 +248,6 @@ relhasindex
 -----------
 (0 rows)
 
-s4: WARNING:  got: cache lookup failed for relation REDACTED
+s4: WARNING:  got: relation "intra_grant_inplace" does not exist
 step revoke4: <... completed>
 step r3: ROLLBACK;