Code review for CLUSTER ALL patch. Fix bogus locking, incorrect transaction
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 30 Dec 2002 18:42:17 +0000 (18:42 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 30 Dec 2002 18:42:17 +0000 (18:42 +0000)
stop/start nesting, other infelicities.

doc/src/sgml/ref/cluster.sgml
src/backend/commands/cluster.c
src/backend/commands/tablecmds.c
src/backend/tcop/utility.c
src/include/commands/cluster.h
src/test/regress/expected/cluster.out

index e72a8f61d1f1129c403666012ce4a9df9301694a..95daa8bf176797fd11b616aaceaf3e9847c3adee 100644 (file)
@@ -1,5 +1,5 @@
 <!--
-$Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.22 2002/11/18 17:12:06 momjian Exp $
+$Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.23 2002/12/30 18:42:12 tgl Exp $
 PostgreSQL documentation
 -->
 
@@ -108,14 +108,16 @@ CLUSTER
 
   <para>
    When a table is clustered, <productname>PostgreSQL</productname>
-   remembers on which index it was clustered.  In calls to
+   remembers on which index it was clustered.  The form
    <command>CLUSTER <replaceable class="parameter">tablename</replaceable></command>,
-   the table is clustered on the same index that it was clustered before.
+   re-clusters the table on the same index that it was clustered before.
   </para>
 
   <para>
-   A simple <command>CLUSTER</command> clusters all the tables in the database
-   that the calling user owns and uses the saved cluster information.  This
+   <command>CLUSTER</command> without any parameter re-clusters all the tables
+   in the
+   current database that the calling user owns, or all tables if called
+   by a superuser.  (Never-clustered tables are not touched.)  This
    form of <command>CLUSTER</command> cannot be called from inside a
    transaction or function.
   </para>
@@ -157,15 +159,15 @@ CLUSTER
    </para>
 
    <para>
-   <command>CLUSTER</command> preserves GRANT, inheritance, index, foreign
-   key, and other ancillary information about the table.
+    <command>CLUSTER</command> preserves GRANT, inheritance, index, foreign
+    key, and other ancillary information about the table.
    </para>
 
    <para>
-   Because <command>CLUSTER</command> remembers the clustering information,
-   one can cluster the tables one wants clustered manually the first time, and
-   setup a timed event similar to <command>VACUUM</command> so that the tables
-   are periodically and automatically clustered.
+    Because <command>CLUSTER</command> remembers the clustering information,
+    one can cluster the tables one wants clustered manually the first time, and
+    setup a timed event similar to <command>VACUUM</command> so that the tables
+    are periodically re-clustered.
    </para>
 
    <para>
index 05321d353beceec993bb0107939518a4ad2e33ab..0361ede00929e20d2576ca61e03bbec6cc53ded0 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.102 2002/12/06 05:00:10 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.103 2002/12/30 18:42:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,6 +36,7 @@
 #include "utils/syscache.h"
 #include "utils/relcache.h"
 
+
 /*
  * We need one of these structs for each index in the relation to be
  * clustered.  It's basically the data needed by index_create() so
@@ -51,7 +52,8 @@ typedef struct
    bool        isclustered;
 } IndexAttrs;
 
-/* This struct is used to pass around the information on tables to be
+/*
+ * This struct is used to pass around the information on tables to be
  * clustered. We need this so we can make a list of them when invoked without
  * a specific table/index pair.
  */
@@ -59,21 +61,174 @@ typedef struct
 {
    Oid     tableOid;
    Oid     indexOid;
-   bool    isPrevious;
-} relToCluster;
+} RelToCluster;
 
+
+static void cluster_rel(RelToCluster *rv, bool recheck);
 static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
 static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
-static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
+static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
+static void rebuild_indexes(Oid OIDOldHeap, List *indexes);
 static void swap_relfilenodes(Oid r1, Oid r2);
-static void cluster_rel(relToCluster *rv);
-static bool check_cluster_ownership(Oid relOid);
-static List *get_tables_to_cluster(AclId owner);
+static bool check_cluster_permitted(Oid relOid);
+static List *get_tables_to_cluster(MemoryContext cluster_context);
+
+
+
+/*---------------------------------------------------------------------------
+ * This cluster code allows for clustering multiple tables at once.    Because
+ * of this, we cannot just run everything on a single transaction, or we
+ * would be forced to acquire exclusive locks on all the tables being
+ * clustered, simultaneously --- very likely leading to deadlock.
+ *
+ * To solve this we follow a similar strategy to VACUUM code,
+ * clustering each relation in a separate transaction. For this to work,
+ * we need to:
+ *  - provide a separate memory context so that we can pass information in
+ *    a way that survives across transactions
+ *  - start a new transaction every time a new relation is clustered
+ *  - check for validity of the information on to-be-clustered relations,
+ *    as someone might have deleted a relation behind our back, or
+ *    clustered one on a different index
+ *  - end the transaction
+ *
+ * The single-relation case does not have any such overhead.
+ *
+ * We also allow a relation being specified without index.  In that case,
+ * the indisclustered bit will be looked up, and an ERROR will be thrown
+ * if there is no index with the bit set.
+ *---------------------------------------------------------------------------
+ */
+void
+cluster(ClusterStmt *stmt)
+{
+   if (stmt->relation != NULL)
+   {
+       /* This is the single-relation case. */
+       Oid             tableOid,
+                       indexOid = InvalidOid;
+       Relation        rel;
+       RelToCluster    rvtc;
+
+       /* Find and lock the table */
+       tableOid = RangeVarGetRelid(stmt->relation, false);
+
+       rel = heap_open(tableOid, AccessExclusiveLock);
+
+       /* Check permissions */
+       if (!check_cluster_permitted(tableOid))
+           elog(ERROR, "CLUSTER: You do not own relation %s",
+                stmt->relation->relname);
+
+       if (stmt->indexname == NULL)
+       {
+           List       *index;
+
+           /* We need to find the index that has indisclustered set. */
+           foreach (index, RelationGetIndexList(rel))
+           {
+               HeapTuple       idxtuple;
+               Form_pg_index   indexForm;
+
+               indexOid = lfirsti(index);
+               idxtuple = SearchSysCache(INDEXRELID,
+                                         ObjectIdGetDatum(indexOid),
+                                         0, 0, 0);
+               if (!HeapTupleIsValid(idxtuple))
+                   elog(ERROR, "Cache lookup failed for index %u",
+                        indexOid);
+               indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
+               if (indexForm->indisclustered)
+               {
+                   ReleaseSysCache(idxtuple);
+                   break;
+               }
+               ReleaseSysCache(idxtuple);
+               indexOid = InvalidOid;
+           }
+
+           if (!OidIsValid(indexOid))
+               elog(ERROR, "CLUSTER: No previously clustered index found on table \"%s\"",
+                    stmt->relation->relname);
+       }
+       else
+       {
+           /* The index is expected to be in the same namespace as the relation. */
+           indexOid = get_relname_relid(stmt->indexname,
+                                        rel->rd_rel->relnamespace);
+           if (!OidIsValid(indexOid))
+               elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
+                    stmt->indexname, stmt->relation->relname);
+       }
+
+       rvtc.tableOid = tableOid;
+       rvtc.indexOid = indexOid;
+
+       /* close relation, keep lock till commit */
+       heap_close(rel, NoLock);
+
+       /* Do the job */
+       cluster_rel(&rvtc, false);
+   }
+   else
+   {
+       /*
+        * This is the "multi relation" case. We need to cluster all tables
+        * that have some index with indisclustered set.
+        */
+       MemoryContext   cluster_context;
+       List            *rv,
+                       *rvs;
+
+       /*
+        * We cannot run this form of CLUSTER inside a user transaction block;
+        * we'd be holding locks way too long.
+        */
+       PreventTransactionChain((void *) stmt, "CLUSTER");
+
+       /*
+        * Create special memory context for cross-transaction storage.
+        *
+        * Since it is a child of QueryContext, it will go away even in case
+        * of error.
+        */
+       cluster_context = AllocSetContextCreate(QueryContext,
+                                               "Cluster",
+                                               ALLOCSET_DEFAULT_MINSIZE,
+                                               ALLOCSET_DEFAULT_INITSIZE,
+                                               ALLOCSET_DEFAULT_MAXSIZE);
+
+       /*
+        * Build the list of relations to cluster.  Note that this lives in
+        * cluster_context.
+        */
+       rvs = get_tables_to_cluster(cluster_context);
+
+       /* Commit to get out of starting transaction */
+       CommitTransactionCommand(true);
+
+       /* Ok, now that we've got them all, cluster them one by one */
+       foreach (rv, rvs)
+       {
+           RelToCluster    *rvtc = (RelToCluster *) lfirst(rv);
+
+           /* Start a new transaction for each relation. */
+           StartTransactionCommand(true);
+           SetQuerySnapshot(); /* might be needed for functional index */
+           cluster_rel(rvtc, true);
+           CommitTransactionCommand(true);
+       }
+
+       /* Start a new transaction for the cleanup work. */
+       StartTransactionCommand(true);
 
-static MemoryContext cluster_context = NULL;
+       /* Clean up working storage */
+       MemoryContextDelete(cluster_context);
+   }
+}
 
 /*
- * cluster
+ * cluster_rel
  *
  * This clusters the table by creating a new, clustered table and
  * swapping the relfilenodes of the new table and the old table, so
@@ -85,45 +240,52 @@ static MemoryContext cluster_context = NULL;
  * same way we do for the relation.  Since we are effectively bulk-loading
  * the new table, it's better to create the indexes afterwards than to fill
  * them incrementally while we load the table.
- *
- * Since we may open a new transaction for each relation, we have to
- * check that the relation still is what we think it is.
  */
-void
-cluster_rel(relToCluster *rvtc)
+static void
+cluster_rel(RelToCluster *rvtc, bool recheck)
 {
    Relation    OldHeap,
                OldIndex;
-   List       *indexes;
 
    /* Check for user-requested abort. */
    CHECK_FOR_INTERRUPTS();
 
-   /* Check if the relation and index still exist before opening them
+   /*
+    * Since we may open a new transaction for each relation, we have to
+    * check that the relation still is what we think it is.
+    *
+    * If this is a single-transaction CLUSTER, we can skip these tests.
+    * We *must* skip the one on indisclustered since it would reject an
+    * attempt to cluster a not-previously-clustered index.
     */
-   if (!SearchSysCacheExists(RELOID,
-                             ObjectIdGetDatum(rvtc->tableOid),
-                             0, 0, 0) ||
+   if (recheck)
+   {
+       HeapTuple       tuple;
+       Form_pg_index   indexForm;
+
+       /*
+        * Check if the relation and index still exist before opening them
+        */
+       if (!SearchSysCacheExists(RELOID,
+                                 ObjectIdGetDatum(rvtc->tableOid),
+                                 0, 0, 0) ||
            !SearchSysCacheExists(RELOID,
                                  ObjectIdGetDatum(rvtc->indexOid),
                                  0, 0, 0))
-       return;
-
-   /* Check that the user still owns the relation */
-   if (!check_cluster_ownership(rvtc->tableOid))
-       return;
+           return;
 
-   /* Check that the index is still the one with indisclustered set.
-    * If this is a standalone cluster, skip this test.
-    */
-   if (rvtc->isPrevious)
-   {
-       HeapTuple       tuple;
-       Form_pg_index   indexForm;
+       /* Check that the user still owns the relation */
+       if (!check_cluster_permitted(rvtc->tableOid))
+           return;
 
+       /*
+        * Check that the index is still the one with indisclustered set.
+        */
        tuple = SearchSysCache(INDEXRELID,
                               ObjectIdGetDatum(rvtc->indexOid),
                               0, 0, 0);
+       if (!HeapTupleIsValid(tuple))
+           return;             /* could have gone away... */
        indexForm = (Form_pg_index) GETSTRUCT(tuple);
        if (!indexForm->indisclustered)
        {
@@ -135,7 +297,8 @@ cluster_rel(relToCluster *rvtc)
 
    /*
     * We grab exclusive access to the target rel and index for the
-    * duration of the transaction.
+    * duration of the transaction.  (This is redundant for the single-
+    * transaction case, since cluster() already did it.)
     */
    OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
 
@@ -162,29 +325,43 @@ cluster_rel(relToCluster *rvtc)
        elog(ERROR, "CLUSTER: cannot cluster system relation \"%s\"",
             RelationGetRelationName(OldHeap));
 
-   /* Save the information of all indexes on the relation. */
-   indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
-
-   /* Drop relcache refcnts, but do NOT give up the locks */
+   /* Drop relcache refcnt on OldIndex, but keep lock */
    index_close(OldIndex);
-   heap_close(OldHeap, NoLock);
 
-   /* rebuild_rel does all the dirty work */
-   rebuild_rel(rvtc->tableOid, rvtc->indexOid, indexes, true);
+   /* rebuild_relation does all the dirty work */
+   rebuild_relation(OldHeap, rvtc->indexOid);
+
+   /* NB: rebuild_relation does heap_close() on OldHeap */
 }
 
+/*
+ * rebuild_relation: rebuild an existing relation
+ *
+ * This is shared code between CLUSTER and TRUNCATE.  In the TRUNCATE
+ * case, the new relation is built and left empty.  In the CLUSTER case,
+ * it is filled with data read from the old relation in the order specified
+ * by the index.
+ *
+ * OldHeap: table to rebuild --- must be opened and exclusive-locked!
+ * indexOid: index to cluster by, or InvalidOid in TRUNCATE case
+ *
+ * NB: this routine closes OldHeap at the right time; caller should not.
+ */
 void
-rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
+rebuild_relation(Relation OldHeap, Oid indexOid)
 {
+   Oid         tableOid = RelationGetRelid(OldHeap);
+   List       *indexes;
    Oid         OIDNewHeap;
    char        NewHeapName[NAMEDATALEN];
    ObjectAddress object;
 
-   /*
-    * If dataCopy is true, we assume that we will be basing the
-    * copy off an index for cluster operations.
-    */
-   Assert(!dataCopy || OidIsValid(indexOid));
+   /* Save the information about all indexes on the relation. */
+   indexes = get_indexattr_list(OldHeap, indexOid);
+
+   /* Close relcache entry, but keep lock until transaction commit */
+   heap_close(OldHeap, NoLock);
+
    /*
     * Create the new heap, using a temporary name in the same namespace
     * as the existing table.  NOTE: there is some risk of collision with
@@ -204,7 +381,7 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
    /*
     * Copy the heap data into the new table in the desired order.
     */
-   if (dataCopy)
+   if (OidIsValid(indexOid))
        copy_heap_data(OIDNewHeap, tableOid, indexOid);
 
    /* To make the new heap's data visible (probably not needed?). */
@@ -230,9 +407,9 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
 
    /*
     * Recreate each index on the relation.  We do not need
-    * CommandCounterIncrement() because recreate_indexattr does it.
+    * CommandCounterIncrement() because rebuild_indexes does it.
     */
-   recreate_indexattr(tableOid, indexes);
+   rebuild_indexes(tableOid, indexes);
 }
 
 /*
@@ -335,7 +512,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  * Get the necessary info about the indexes of the relation and
  * return a list of IndexAttrs structures.
  */
-List *
+static List *
 get_indexattr_list(Relation OldHeap, Oid OldIndex)
 {
    List       *indexes = NIL;
@@ -366,7 +543,8 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
            palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
        memcpy(attrs->classOID, indexForm->indclass,
               sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
-       attrs->isclustered = (OldIndex == indexOID);
+       /* We adjust the isclustered attribute to correct new state */
+       attrs->isclustered = (indexOID == OldIndex);
 
        /* Name and access method of each index come from pg_class */
        classTuple = SearchSysCache(RELOID,
@@ -397,7 +575,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
  * the new index (carrying the old index filenode along).
  */
 static void
-recreate_indexattr(Oid OIDOldHeap, List *indexes)
+rebuild_indexes(Oid OIDOldHeap, List *indexes)
 {
    List       *elem;
 
@@ -629,235 +807,69 @@ swap_relfilenodes(Oid r1, Oid r2)
    heap_close(relRelation, RowExclusiveLock);
 }
 
-/*---------------------------------------------------------------------------
- * This cluster code allows for clustering multiple tables at once.    Because
- * of this, we cannot just run everything on a single transaction, or we
- * would be forced to acquire exclusive locks on all the tables being
- * clustered.  To solve this we follow a similar strategy to VACUUM code,
- * clustering each relation in a separate transaction. For this to work,
- * we need to:
- *  - provide a separate memory context so that we can pass information in
- *    a way that trascends transactions
- *  - start a new transaction every time a new relation is clustered
- *  - check for validity of the information on to-be-clustered relations,
- *    as someone might have deleted a relation behind our back, or
- *    clustered one on a different index
- *  - end the transaction
- *
- * The single relation code does not have any overhead.
- *
- * We also allow a relation being specified without index.  In that case,
- * the indisclustered bit will be looked up, and an ERROR will be thrown
- * if there is no index with the bit set.
- *---------------------------------------------------------------------------
- */
-void
-cluster(ClusterStmt *stmt)
-{
-
-   /* This is the single relation case. */
-   if (stmt->relation != NULL)
-   {
-       Oid             indexOid = InvalidOid,
-                       tableOid;
-       relToCluster    rvtc;
-       HeapTuple       tuple;
-       Form_pg_class   classForm;
-
-       tableOid = RangeVarGetRelid(stmt->relation, false);
-       if (!check_cluster_ownership(tableOid))
-           elog(ERROR, "CLUSTER: You do not own relation %s",
-                stmt->relation->relname);
-
-       tuple = SearchSysCache(RELOID,
-                              ObjectIdGetDatum(tableOid),
-                              0, 0, 0);
-       if (!HeapTupleIsValid(tuple))
-           elog(ERROR, "Cache lookup failed for relation %u", tableOid);
-       classForm = (Form_pg_class) GETSTRUCT(tuple);
-
-       if (stmt->indexname == NULL)
-       {
-           List       *index;
-           Relation    rel = RelationIdGetRelation(tableOid);
-           HeapTuple   ituple = NULL,
-                       idxtuple = NULL;
-
-           /* We need to fetch the index that has indisclustered set. */
-           foreach (index, RelationGetIndexList(rel))
-           {
-               Form_pg_index   indexForm;
-
-               indexOid = lfirsti(index);
-               ituple = SearchSysCache(RELOID,
-                                      ObjectIdGetDatum(indexOid),
-                                      0, 0, 0);
-               if (!HeapTupleIsValid(ituple))
-                   elog(ERROR, "Cache lookup failed for relation %u", indexOid);
-               idxtuple = SearchSysCache(INDEXRELID,
-                                         ObjectIdGetDatum(HeapTupleGetOid(ituple)),
-                                         0, 0, 0);
-               if (!HeapTupleIsValid(idxtuple))
-                   elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
-               indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
-               if (indexForm->indisclustered)
-                   break;
-               indexOid = InvalidOid;
-           }
-           if (indexOid == InvalidOid)
-               elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
-                    stmt->relation->relname);
-           RelationClose(rel);
-           ReleaseSysCache(ituple);
-           ReleaseSysCache(idxtuple);
-       }
-       else
-       {
-           /* The index is expected to be in the same namespace as the relation. */
-           indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
-       }
-       ReleaseSysCache(tuple);
-
-       /* XXX Maybe the namespace should be reported as well */
-       if (!OidIsValid(indexOid))
-           elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
-                stmt->indexname, stmt->relation->relname);
-       rvtc.tableOid = tableOid;
-       rvtc.indexOid = indexOid;
-       rvtc.isPrevious = false;
-
-       /* Do the job */
-       cluster_rel(&rvtc);
-   }
-   else
-   {
-       /*
-        * This is the "no relation" case. We need to cluster all tables
-        * that have some index with indisclustered set.
-        */
-
-       relToCluster    *rvtc;
-       List            *rv,
-                       *rvs;
-
-       /*
-        * We cannot run CLUSTER inside a user transaction block; if we were inside
-        * a transaction, then our commit- and start-transaction-command calls
-        * would not have the intended effect!
-        */
-       if (IsTransactionBlock())
-           elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
-
-       /* Running CLUSTER from a function would free the function context */
-       if (!MemoryContextContains(QueryContext, stmt))
-           elog(ERROR, "CLUSTER cannot be called from a function");
-       /*
-        * Create special memory context for cross-transaction storage.
-        *
-        * Since it is a child of QueryContext, it will go away even in case
-        * of error.
-        */
-       cluster_context = AllocSetContextCreate(QueryContext,
-               "Cluster",
-               ALLOCSET_DEFAULT_MINSIZE,
-               ALLOCSET_DEFAULT_INITSIZE,
-               ALLOCSET_DEFAULT_MAXSIZE);
-
-       /*
-        * Build the list of relations to cluster.  Note that this lives in
-        * cluster_context.
-        */
-       rvs = get_tables_to_cluster(GetUserId());
-
-       /* Ok, now that we've got them all, cluster them one by one */
-       foreach (rv, rvs)
-       {
-           rvtc = (relToCluster *)lfirst(rv);
-
-           /* Start a new transaction for this relation. */
-           StartTransactionCommand(true);
-           cluster_rel(rvtc);
-           CommitTransactionCommand(true);
-       }
-   }
-
-   /* Start a new transaction for the cleanup work. */
-   StartTransactionCommand(true);
-
-   /* Clean up working storage */
-   if (stmt->relation == NULL)
-   {
-       MemoryContextDelete(cluster_context);
-       cluster_context = NULL;
-   }
-}
-
-/* Checks if the user owns the relation. Superusers
- * are allowed to cluster any table.
+/*
+ * Checks if the user is allowed to cluster (ie, owns) the relation.
+ * Superusers are allowed to cluster any table.
  */
-bool
-check_cluster_ownership(Oid relOid)
+static bool
+check_cluster_permitted(Oid relOid)
 {
    /* Superusers bypass this check */
    return pg_class_ownercheck(relOid, GetUserId());
 }
 
-/* Get a list of tables that the current user owns and
+/*
+ * Get a list of tables that the current user owns and
  * have indisclustered set.  Return the list in a List * of rvsToCluster
  * with the tableOid and the indexOid on which the table is already
  * clustered.
  */
-List *
-get_tables_to_cluster(AclId owner)
+static List *
+get_tables_to_cluster(MemoryContext cluster_context)
 {
    Relation        indRelation;
    HeapScanDesc    scan;
    ScanKeyData     entry;
    HeapTuple       indexTuple;
    Form_pg_index   index;
-   relToCluster   *rvtc;
+   MemoryContext   old_context;
+   RelToCluster   *rvtc;
    List           *rvs = NIL;
 
    /*
-    * Get all indexes that have indisclustered set. System
-    * relations or nailed-in relations cannot ever have
-    * indisclustered set, because CLUSTER will refuse to
-    * set it when called with one of them as argument.
+    * Get all indexes that have indisclustered set and are owned by
+    * appropriate user. System relations or nailed-in relations cannot ever
+    * have indisclustered set, because CLUSTER will refuse to set it when
+    * called with one of them as argument.
     */
-   indRelation = relation_openr(IndexRelationName, RowExclusiveLock);
-   ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered,
-                          F_BOOLEQ, true);
+   indRelation = relation_openr(IndexRelationName, AccessShareLock);
+   ScanKeyEntryInitialize(&entry, 0,
+                          Anum_pg_index_indisclustered,
+                          F_BOOLEQ,
+                          BoolGetDatum(true));
    scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
    while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
    {
-       MemoryContext   old_context = NULL;
-
        index = (Form_pg_index) GETSTRUCT(indexTuple);
-       if (!check_cluster_ownership(index->indrelid))
+       if (!check_cluster_permitted(index->indrelid))
            continue;
 
        /*
-        * We have to build the struct in a different memory context so
+        * We have to build the list in a different memory context so
         * it will survive the cross-transaction processing
         */
-
        old_context = MemoryContextSwitchTo(cluster_context);
 
-       rvtc = (relToCluster *)palloc(sizeof(relToCluster));
-       rvtc->indexOid = index->indexrelid;
+       rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
        rvtc->tableOid = index->indrelid;
-       rvtc->isPrevious = true;
-       rvs = lcons((void *)rvtc, rvs);
+       rvtc->indexOid = index->indexrelid;
+       rvs = lcons(rvtc, rvs);
 
        MemoryContextSwitchTo(old_context);
    }
    heap_endscan(scan);
 
-   /*
-    * Release the lock on pg_index. We will check the indexes
-    * later again.
-    *
-    */
-   relation_close(indRelation, RowExclusiveLock);
+   relation_close(indRelation, AccessShareLock);
+
    return rvs;
 }
index 2a1a2b4cf4933dbfeb488edbe8e8941d2bddb71b..29edb61638e507ac8a60a04a73ef66610475af5e 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.62 2002/12/16 18:39:22 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.63 2002/12/30 18:42:14 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -355,7 +355,7 @@ RemoveRelation(const RangeVar *relation, DropBehavior behavior)
  *     Removes all the rows from a relation.
  *
  * Note: This routine only does safety and permissions checks;
- * rebuild_rel in cluster.c does the actual work.
+ * rebuild_relation in cluster.c does the actual work.
  */
 void
 TruncateRelation(const RangeVar *relation)
@@ -366,7 +366,6 @@ TruncateRelation(const RangeVar *relation)
    Relation    fkeyRel;
    SysScanDesc fkeyScan;
    HeapTuple   tuple;
-   List       *indexes;
 
    /* Grab exclusive lock in preparation for truncate */
    rel = heap_openrv(relation, AccessExclusiveLock);
@@ -433,17 +432,13 @@ TruncateRelation(const RangeVar *relation)
    systable_endscan(fkeyScan);
    heap_close(fkeyRel, AccessShareLock);
 
-   /* Save the information of all indexes on the relation. */
-   indexes = get_indexattr_list(rel, InvalidOid);
-
-   /* Keep the lock until transaction commit */
-   heap_close(rel, NoLock);
-
    /*
     * Do the real work using the same technique as cluster, but
-    * without the code copy portion
+    * without the data-copying portion
     */
-   rebuild_rel(relid, InvalidOid, indexes, false);
+   rebuild_relation(rel, InvalidOid);
+
+   /* NB: rebuild_relation does heap_close() */
 }
 
 /*----------
index cc0f139b2dbb9d7c78c2a08cd96a77b7976868bb..962c5311500d791c0aac23e17be3fe26ff68eb5b 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.186 2002/12/30 15:31:48 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.187 2002/12/30 18:42:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -187,7 +187,6 @@ ProcessUtility(Node *parsetree,
               CommandDest dest,
               char *completionTag)
 {
-
    if (completionTag)
        completionTag[0] = '\0';
 
@@ -195,7 +194,6 @@ ProcessUtility(Node *parsetree,
    {
            /*
             * ******************************** transactions ********************************
-            *
             */
        case T_TransactionStmt:
            {
@@ -742,11 +740,7 @@ ProcessUtility(Node *parsetree,
            break;
 
        case T_ClusterStmt:
-           {
-               ClusterStmt *stmt = (ClusterStmt *) parsetree;
-
-               cluster(stmt);
-           }
+           cluster((ClusterStmt *) parsetree);
            break;
 
        case T_VacuumStmt:
@@ -874,7 +868,6 @@ ProcessUtility(Node *parsetree,
 
                switch (stmt->reindexType)
                {
-                   char    *relname;
                    case INDEX:
                        CheckOwnership(stmt->relation, false);
                        ReindexIndex(stmt->relation, stmt->force);
@@ -884,8 +877,7 @@ ProcessUtility(Node *parsetree,
                        ReindexTable(stmt->relation, stmt->force);
                        break;
                    case DATABASE:
-                       relname = (char *) stmt->name;
-                       ReindexDatabase(relname, stmt->force, false);
+                       ReindexDatabase(stmt->name, stmt->force, false);
                        break;
                }
                break;
index c83db667252907a52d774bb0894e9055c73d6c11..7eb11d60b06531f9b5bb5e150309d4d780243f8f 100644 (file)
@@ -6,22 +6,19 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994-5, Regents of the University of California
  *
- * $Id: cluster.h,v 1.17 2002/11/23 04:05:52 momjian Exp $
+ * $Id: cluster.h,v 1.18 2002/12/30 18:42:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef CLUSTER_H
 #define CLUSTER_H
 
-#include <nodes/parsenodes.h>
-/*
- * functions
- */
-extern void cluster(ClusterStmt *stmt);
+#include "nodes/parsenodes.h"
+#include "utils/rel.h"
 
-extern List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
-extern void rebuild_rel(Oid tableOid, Oid indexOid,
-                       List *indexes, bool dataCopy);
 
+extern void cluster(ClusterStmt *stmt);
+
+extern void rebuild_relation(Relation OldHeap, Oid indexOid);
 
 #endif   /* CLUSTER_H */
index 105b4c8ec21b3de55914c17e423bea6be6fe4e90..48daf0fc25d3ae55d8585abba347c131d02bded7 100644 (file)
@@ -304,7 +304,7 @@ INSERT INTO clstr_3 VALUES (2);
 INSERT INTO clstr_3 VALUES (1);
 -- "CLUSTER <tablename>" on a table that hasn't been clustered
 CLUSTER clstr_2;
-ERROR:  CLUSTER: No previously clustered index found on table clstr_2
+ERROR:  CLUSTER: No previously clustered index found on table "clstr_2"
 CLUSTER clstr_1_pkey ON clstr_1;
 CLUSTER clstr_2_pkey ON clstr_2;
 SELECT * FROM clstr_1 UNION ALL