Improve behavior of concurrent ALTER <relation> .. SET SCHEMA.
authorRobert Haas <rhaas@postgresql.org>
Fri, 16 Dec 2011 00:02:58 +0000 (19:02 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 16 Dec 2011 00:02:58 +0000 (19:02 -0500)
If the referrent of a name changes while we're waiting for the lock,
we must recheck permissons.  We also now check the relkind before
locking, since it's easy to do that long the way.

Patch by me; review by Noah Misch.

src/backend/commands/alter.c
src/backend/commands/tablecmds.c

index 2954bb27dd9dcc6425c87bdbc781a302e2baf96a..8513837fa1033e3eb435fea3d493b1603261ee07 100644 (file)
@@ -191,7 +191,6 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
        case OBJECT_TABLE:
        case OBJECT_VIEW:
        case OBJECT_FOREIGN_TABLE:
-           CheckRelationOwnership(stmt->relation, true);
            AlterTableNamespace(stmt->relation, stmt->newschema,
                                stmt->objectType, AccessExclusiveLock);
            break;
index a036f823f95e0f07cfba6e1ce719f9642c268681..135736a8dc64c8404060a9d6e225e6e9f6463800 100644 (file)
@@ -9379,28 +9379,35 @@ ATExecGenericOptions(Relation rel, List *options)
    heap_freetuple(tuple);
 }
 
-
 /*
- * Execute ALTER TABLE SET SCHEMA
- *
- * Note: caller must have checked ownership of the relation already
+ * Perform permissions and integrity checks before acquiring a relation lock.
  */
-void
-AlterTableNamespace(RangeVar *relation, const char *newschema,
-                   ObjectType stmttype, LOCKMODE lockmode)
+static void
+RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid,
+                                      Oid oldrelid, void *arg)
 {
-   Relation    rel;
-   Oid         relid;
-   Oid         oldNspOid;
-   Oid         nspOid;
-   Relation    classRel;
+   HeapTuple       tuple;
+   Form_pg_class   form;
+   ObjectType      stmttype;
+
+   tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+   if (!HeapTupleIsValid(tuple))
+       return;                         /* concurrently dropped */
+   form = (Form_pg_class) GETSTRUCT(tuple);
 
-   rel = relation_openrv(relation, lockmode);
+   /* Must own table. */
+   if (!pg_class_ownercheck(relid, GetUserId()))
+       aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
 
-   relid = RelationGetRelid(rel);
-   oldNspOid = RelationGetNamespace(rel);
+   /* No system table modifications unless explicitly allowed. */
+   if (!allowSystemTableMods && IsSystemClass(form))
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                errmsg("permission denied: \"%s\" is a system catalog",
+                       rv->relname)));
 
    /* Check relation type against type specified in the ALTER command */
+   stmttype = * (ObjectType *) arg;
    switch (stmttype)
    {
        case OBJECT_TABLE:
@@ -9412,27 +9419,24 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
            break;
 
        case OBJECT_SEQUENCE:
-           if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
+           if (form->relkind != RELKIND_SEQUENCE)
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("\"%s\" is not a sequence",
-                               RelationGetRelationName(rel))));
+                        errmsg("\"%s\" is not a sequence", rv->relname)));
            break;
 
        case OBJECT_VIEW:
-           if (rel->rd_rel->relkind != RELKIND_VIEW)
+           if (form->relkind != RELKIND_VIEW)
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("\"%s\" is not a view",
-                               RelationGetRelationName(rel))));
+                        errmsg("\"%s\" is not a view", rv->relname)));
            break;
 
        case OBJECT_FOREIGN_TABLE:
-           if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
+           if (form->relkind != RELKIND_FOREIGN_TABLE)
                ereport(ERROR,
                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                        errmsg("\"%s\" is not a foreign table",
-                               RelationGetRelationName(rel))));
+                        errmsg("\"%s\" is not a foreign table", rv->relname)));
            break;
 
        default:
@@ -9440,33 +9444,18 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
    }
 
    /* Can we change the schema of this tuple? */
-   switch (rel->rd_rel->relkind)
+   switch (form->relkind)
    {
        case RELKIND_RELATION:
        case RELKIND_VIEW:
+       case RELKIND_SEQUENCE:
        case RELKIND_FOREIGN_TABLE:
            /* ok to change schema */
            break;
-       case RELKIND_SEQUENCE:
-           {
-               /* if it's an owned sequence, disallow moving it by itself */
-               Oid         tableId;
-               int32       colId;
-
-               if (sequenceIsOwned(relid, &tableId, &colId))
-                   ereport(ERROR,
-                           (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                            errmsg("cannot move an owned sequence into another schema"),
-                     errdetail("Sequence \"%s\" is linked to table \"%s\".",
-                               RelationGetRelationName(rel),
-                               get_rel_name(tableId))));
-           }
-           break;
        case RELKIND_COMPOSITE_TYPE:
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("\"%s\" is a composite type",
-                           RelationGetRelationName(rel)),
+                    errmsg("\"%s\" is a composite type", rv->relname),
                     errhint("Use ALTER TYPE instead.")));
            break;
        case RELKIND_INDEX:
@@ -9476,7 +9465,45 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
            errmsg("\"%s\" is not a table, view, sequence, or foreign table",
-                  RelationGetRelationName(rel))));
+                  rv->relname)));
+   }
+
+   ReleaseSysCache(tuple);
+}
+
+/*
+ * Execute ALTER TABLE SET SCHEMA
+ */
+void
+AlterTableNamespace(RangeVar *relation, const char *newschema,
+                   ObjectType stmttype, LOCKMODE lockmode)
+{
+   Relation    rel;
+   Oid         relid;
+   Oid         oldNspOid;
+   Oid         nspOid;
+   Relation    classRel;
+
+   relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
+                                    RangeVarCallbackForAlterTableNamespace,
+                                    (void *) &stmttype);
+   rel = relation_open(relid, NoLock);
+
+   oldNspOid = RelationGetNamespace(rel);
+
+   /* If it's an owned sequence, disallow moving it by itself. */
+   if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+   {
+       Oid         tableId;
+       int32       colId;
+
+       if (sequenceIsOwned(relid, &tableId, &colId))
+           ereport(ERROR,
+                   (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                    errmsg("cannot move an owned sequence into another schema"),
+             errdetail("Sequence \"%s\" is linked to table \"%s\".",
+                       RelationGetRelationName(rel),
+                       get_rel_name(tableId))));
    }
 
    /* get schema OID and check its permissions */