Refactor broken CREATE TABLE IF NOT EXISTS support.
authorRobert Haas <rhaas@postgresql.org>
Mon, 25 Apr 2011 20:55:11 +0000 (16:55 -0400)
committerRobert Haas <rhaas@postgresql.org>
Mon, 25 Apr 2011 20:55:11 +0000 (16:55 -0400)
Per bug #5988, reported by Marko Tiikkaja, and further analyzed by Tom
Lane, the previous coding was broken in several respects: even if the
target table already existed, a subsequent CREATE TABLE IF NOT EXISTS
might try to add additional constraints or sequences-for-serial
specified in the new CREATE TABLE statement.

In passing, this also fixes a minor information leak: it's no longer
possible to figure out whether a schema to which you don't have CREATE
access contains a sequence named like "x_y_seq" by attempting to create a
table in that schema called "x" with a serial column called "y".

Some more refactoring of this code in the future might be warranted,
but that will need to wait for a later major release.

src/backend/bootstrap/bootparse.y
src/backend/catalog/heap.c
src/backend/catalog/namespace.c
src/backend/catalog/toasting.c
src/backend/commands/cluster.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/parser/parse_utilcmd.c
src/backend/tcop/utility.c
src/include/catalog/heap.h
src/include/catalog/namespace.h

index 75a137bbd7732692d40afc9e8c411c3508d47e12..a9d242869897d1d23fb5c349219466dfd4b422d4 100644 (file)
@@ -247,8 +247,7 @@ Boot_CreateStmt:
                                                      ONCOMMIT_NOOP,
                                                      (Datum) 0,
                                                      false,
-                                                     true,
-                                                     false);
+                                                     true);
                        elog(DEBUG4, "relation created with oid %u", id);
                    }
                    do_end();
index 4c089677030d391f5a0e8e8ede24566e558359de..71c99318343c4c8f80b03ad237b7943b872769a8 100644 (file)
@@ -973,8 +973,7 @@ heap_create_with_catalog(const char *relname,
                         OnCommitAction oncommit,
                         Datum reloptions,
                         bool use_user_acl,
-                        bool allow_system_table_mods,
-                        bool if_not_exists)
+                        bool allow_system_table_mods)
 {
    Relation    pg_class_desc;
    Relation    new_rel_desc;
@@ -994,26 +993,14 @@ heap_create_with_catalog(const char *relname,
    CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
 
    /*
-    * If the relation already exists, it's an error, unless the user
-    * specifies "IF NOT EXISTS".  In that case, we just print a notice and do
-    * nothing further.
+    * This would fail later on anyway, if the relation already exists.  But
+    * by catching it here we can emit a nicer error message.
     */
    existing_relid = get_relname_relid(relname, relnamespace);
    if (existing_relid != InvalidOid)
-   {
-       if (if_not_exists)
-       {
-           ereport(NOTICE,
-                   (errcode(ERRCODE_DUPLICATE_TABLE),
-                    errmsg("relation \"%s\" already exists, skipping",
-                           relname)));
-           heap_close(pg_class_desc, RowExclusiveLock);
-           return InvalidOid;
-       }
        ereport(ERROR,
                (errcode(ERRCODE_DUPLICATE_TABLE),
                 errmsg("relation \"%s\" already exists", relname)));
-   }
 
    /*
     * Since we are going to create a rowtype as well, also check for
index f8fd8276936a687e97655147845e4d4f9fa2f495..d803d28a0679194197cc3f8267b12559f7ec5612 100644 (file)
@@ -360,6 +360,35 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation)
    return namespaceId;
 }
 
+/*
+ * RangeVarGetAndCheckCreationNamespace
+ *      As RangeVarGetCreationNamespace, but with a permissions check.
+ */
+Oid
+RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation)
+{
+   Oid         namespaceId;
+
+   namespaceId = RangeVarGetCreationNamespace(newRelation);
+
+   /*
+    * Check we have permission to create there. Skip check if bootstrapping,
+    * since permissions machinery may not be working yet.
+    */
+   if (!IsBootstrapProcessingMode())
+   {
+       AclResult   aclresult;
+
+       aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
+                                         ACL_CREATE);
+       if (aclresult != ACLCHECK_OK)
+           aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+                          get_namespace_name(namespaceId));
+   }
+
+   return namespaceId;
+}
+
 /*
  * RelnameGetRelid
  *     Try to resolve an unqualified relation name.
index 362d26d9d1fe73f5d2519c151963053a75c4e9fa..a8cf0dbe2f25896ad4f1663ed1bd99997f3de68e 100644 (file)
@@ -227,8 +227,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
                                           ONCOMMIT_NOOP,
                                           reloptions,
                                           false,
-                                          true,
-                                          false);
+                                          true);
    Assert(toast_relid != InvalidOid);
 
    /* make the toast relation visible, else heap_open will fail */
index ff228b7d5310b958dd16e4d173a7fc6712280eaf..191ef543cd20f3dcf1633d67a3ba4425bb889438 100644 (file)
@@ -646,8 +646,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
                                          ONCOMMIT_NOOP,
                                          reloptions,
                                          false,
-                                         true,
-                                         false);
+                                         true);
    Assert(OIDNewHeap != InvalidOid);
 
    ReleaseSysCache(tuple);
index 7660114ec2ce41b050c7cbe2aa44fc0c99abfa1d..60eecb14976db56508b9d3c850d1879b47487d8f 100644 (file)
@@ -439,22 +439,10 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
                 errmsg("cannot create temporary table within security-restricted operation")));
 
    /*
-    * Look up the namespace in which we are supposed to create the relation.
-    * Check we have permission to create there. Skip check if bootstrapping,
-    * since permissions machinery may not be working yet.
+    * Look up the namespace in which we are supposed to create the relation,
+    * and check we have permission to create there.
     */
-   namespaceId = RangeVarGetCreationNamespace(stmt->relation);
-
-   if (!IsBootstrapProcessingMode())
-   {
-       AclResult   aclresult;
-
-       aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
-                                         ACL_CREATE);
-       if (aclresult != ACLCHECK_OK)
-           aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-                          get_namespace_name(namespaceId));
-   }
+   namespaceId = RangeVarGetAndCheckCreationNamespace(stmt->relation);
 
    /*
     * Select tablespace to use.  If not specified, use default tablespace
@@ -602,16 +590,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
                                          stmt->oncommit,
                                          reloptions,
                                          true,
-                                         allowSystemTableMods,
-                                         stmt->if_not_exists);
-
-   /*
-    * If heap_create_with_catalog returns InvalidOid, it means that the user
-    * specified "IF NOT EXISTS" and the relation already exists.  In that
-    * case we do nothing further.
-    */
-   if (relationId == InvalidOid)
-       return InvalidOid;
+                                         allowSystemTableMods);
 
    /* Store inheritance information for new rel. */
    StoreCatalogInheritance(relationId, inheritOids);
index 86ec9870198af543e8d45b90e92357c908d2f22c..620efda838421a94a9835a7e8db1d8fa4b50e1ea 100644 (file)
@@ -2341,7 +2341,6 @@ OpenIntoRel(QueryDesc *queryDesc)
    Oid         namespaceId;
    Oid         tablespaceId;
    Datum       reloptions;
-   AclResult   aclresult;
    Oid         intoRelationId;
    TupleDesc   tupdesc;
    DR_intorel *myState;
@@ -2378,13 +2377,7 @@ OpenIntoRel(QueryDesc *queryDesc)
     * Find namespace to create in, check its permissions
     */
    intoName = into->rel->relname;
-   namespaceId = RangeVarGetCreationNamespace(into->rel);
-
-   aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
-                                     ACL_CREATE);
-   if (aclresult != ACLCHECK_OK)
-       aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-                      get_namespace_name(namespaceId));
+   namespaceId = RangeVarGetAndCheckCreationNamespace(into->rel);
 
    /*
     * Select tablespace to use.  If not specified, use default tablespace
@@ -2444,8 +2437,7 @@ OpenIntoRel(QueryDesc *queryDesc)
                                              into->onCommit,
                                              reloptions,
                                              true,
-                                             allowSystemTableMods,
-                                             false);
+                                             allowSystemTableMods);
    Assert(intoRelationId != InvalidOid);
 
    FreeTupleDesc(tupdesc);
index 0078814905d12ed8638af2d39d18274dba3edbba..5588cfac0bd54a8a8d45ec49569060fa5570a359 100644 (file)
@@ -148,6 +148,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
    List       *result;
    List       *save_alist;
    ListCell   *elements;
+   Oid         namespaceid;
 
    /*
     * We must not scribble on the passed-in CreateStmt, so copy it.  (This is
@@ -155,6 +156,33 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
     */
    stmt = (CreateStmt *) copyObject(stmt);
 
+   /*
+    * Look up the creation namespace.  This also checks permissions on the
+    * target namespace, so that we throw any permissions error as early as
+    * possible.
+    */
+   namespaceid = RangeVarGetAndCheckCreationNamespace(stmt->relation);
+
+   /*
+    * If the relation already exists and the user specified "IF NOT EXISTS",
+    * bail out with a NOTICE.
+    */
+   if (stmt->if_not_exists)
+   {
+       Oid     existing_relid;
+
+       existing_relid = get_relname_relid(stmt->relation->relname,
+                                          namespaceid);
+       if (existing_relid != InvalidOid)
+       {
+           ereport(NOTICE,
+                   (errcode(ERRCODE_DUPLICATE_TABLE),
+                    errmsg("relation \"%s\" already exists, skipping",
+                        stmt->relation->relname)));
+           return NIL;
+       }
+   }
+
    /*
     * If the target relation name isn't schema-qualified, make it so.  This
     * prevents some corner cases in which added-on rewritten commands might
@@ -164,11 +192,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
     */
    if (stmt->relation->schemaname == NULL
        && stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
-   {
-       Oid         namespaceid = RangeVarGetCreationNamespace(stmt->relation);
-
        stmt->relation->schemaname = get_namespace_name(namespaceid);
-   }
 
    /* Set up pstate and CreateStmtContext */
    pstate = make_parsestate(NULL);
index b1fd5ec04fe6a6506cb3632b3e01e1686e8d6dc1..0559998c71f78fcb763fd6f55ecf9d9a61d7d913 100644 (file)
@@ -529,13 +529,6 @@ standard_ProcessUtility(Node *parsetree,
                                                RELKIND_RELATION,
                                                InvalidOid);
 
-                       /*
-                        * If "IF NOT EXISTS" was specified and the relation
-                        * already exists, do nothing further.
-                        */
-                       if (relOid == InvalidOid)
-                           continue;
-
                        /*
                         * Let AlterTableCreateToastTable decide if this one
                         * needs a secondary relation too.
@@ -559,15 +552,8 @@ standard_ProcessUtility(Node *parsetree,
                        relOid = DefineRelation((CreateStmt *) stmt,
                                                RELKIND_FOREIGN_TABLE,
                                                InvalidOid);
-
-                       /*
-                        * Unless "IF NOT EXISTS" was specified and the
-                        * relation already exists, create the
-                        * pg_foreign_table entry.
-                        */
-                       if (relOid != InvalidOid)
-                           CreateForeignTable((CreateForeignTableStmt *) stmt,
-                                              relOid);
+                       CreateForeignTable((CreateForeignTableStmt *) stmt,
+                                          relOid);
                    }
                    else
                    {
index 463aff0358fcc7366bf34c7222c2ec9b164f9c97..c95e91303b8873633a9460e9b85ce24df21b45b1 100644 (file)
@@ -63,8 +63,7 @@ extern Oid heap_create_with_catalog(const char *relname,
                         OnCommitAction oncommit,
                         Datum reloptions,
                         bool use_user_acl,
-                        bool allow_system_table_mods,
-                        bool if_not_exists);
+                        bool allow_system_table_mods);
 
 extern void heap_drop_with_catalog(Oid relid);
 
index f59beee80dd77d46df81a5dfe3d20e01cd0a24b0..53600969ad7e29ef5875ff3c1feb3edd59e2c822 100644 (file)
@@ -49,6 +49,7 @@ typedef struct OverrideSearchPath
 
 extern Oid RangeVarGetRelid(const RangeVar *relation, bool failOK);
 extern Oid RangeVarGetCreationNamespace(const RangeVar *newRelation);
+extern Oid RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation);
 extern Oid RelnameGetRelid(const char *relname);
 extern bool RelationIsVisible(Oid relid);