Add support for REFRESH MATERIALIZED VIEW CONCURRENTLY.
authorKevin Grittner <kgrittn@postgresql.org>
Tue, 16 Jul 2013 17:55:44 +0000 (12:55 -0500)
committerKevin Grittner <kgrittn@postgresql.org>
Tue, 16 Jul 2013 17:55:44 +0000 (12:55 -0500)
This allows reads to continue without any blocking while a REFRESH
runs.  The new data appears atomically as part of transaction
commit.

Review questioned the Assert that a matview was not a system
relation.  This will be addressed separately.

Reviewed by Hitoshi Harada, Robert Haas, Andres Freund.
Merged after review with security patch f3ab5d4.

16 files changed:
doc/src/sgml/mvcc.sgml
doc/src/sgml/ref/refresh_materialized_view.sgml
src/backend/commands/cluster.c
src/backend/commands/matview.c
src/backend/commands/tablecmds.c
src/backend/executor/execMain.c
src/backend/executor/nodeModifyTable.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/parser/gram.y
src/bin/psql/tab-complete.c
src/include/commands/cluster.h
src/include/commands/matview.h
src/include/nodes/parsenodes.h
src/test/regress/expected/matview.out
src/test/regress/sql/matview.sql

index 316add70b705956a157fcd5d02fe471aaa5f829d..cefd3235a6b0c4c405a1c614b4eaba4690a15b86 100644 (file)
@@ -928,8 +928,7 @@ ERROR:  could not serialize access due to read/write dependencies among transact
         </para>
 
         <para>
-         This lock mode is not automatically acquired on tables by any
-         <productname>PostgreSQL</productname> command.
+         Acquired by <command>REFRESH MATERIALIZED VIEW CONCURRENTLY</command>.
         </para>
        </listitem>
       </varlistentry>
index 8f59bbf123108ae9f87be58aadfdf7124756a2b6..d2f8104aa7dff9964bda0d2e57f8795a73b6ec1d 100644 (file)
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-REFRESH MATERIALIZED VIEW <replaceable class="PARAMETER">name</replaceable>
+REFRESH MATERIALIZED VIEW [ CONCURRENTLY ] <replaceable class="PARAMETER">name</replaceable>
     [ WITH [ NO ] DATA ]
 </synopsis>
  </refsynopsisdiv>
@@ -38,12 +38,44 @@ REFRESH MATERIALIZED VIEW <replaceable class="PARAMETER">name</replaceable>
    data is generated and the materialized view is left in an unscannable
    state.
   </para>
+  <para>
+   <literal>CONCURRENTLY</literal> and <literal>WITH NO DATA</literal> may not
+   be specified together.
+  </para>
  </refsect1>
 
  <refsect1>
   <title>Parameters</title>
 
   <variablelist>
+   <varlistentry>
+    <term><literal>CONCURRENTLY</literal></term>
+    <listitem>
+     <para>
+      Refresh the materialized view without locking out concurrent selects on
+      the materialized view.  Without this option a refresh which affects a
+      lot of rows will tend to use fewer resources and complete more quickly,
+      but could block other connections which are trying to read from the
+      materialized view.  This option may be faster in cases where a small
+      number of rows are affected.
+     </para>
+     <para>
+      This option is only allowed if there is at least one
+      <literal>UNIQUE</literal> index on the materialized view which uses only
+      column names and includes all rows; that is, it must not index on any
+      expressions nor include a <literal>WHERE</literal> clause.
+     </para>
+     <para>
+      This option may not be used when the materialized view is not already
+      populated.
+     </para>
+     <para>
+      Even with this option only one <literal>REFRESH</literal> at a time may
+      run against any one materialized view.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><replaceable class="PARAMETER">name</replaceable></term>
     <listitem>
index 686770f881e4e5f28feb3a7b89765c2859f2dfdb..c20a6fb073e04009392722948de2f17eb8917a49 100644 (file)
@@ -589,7 +589,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid,
    heap_close(OldHeap, NoLock);
 
    /* Create the transient table that will receive the re-ordered data */
-   OIDNewHeap = make_new_heap(tableOid, tableSpace);
+   OIDNewHeap = make_new_heap(tableOid, tableSpace, false,
+                              AccessExclusiveLock);
 
    /* Copy the heap data into the new table in the desired order */
    copy_heap_data(OIDNewHeap, tableOid, indexOid,
@@ -616,7 +617,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid,
  * data, then call finish_heap_swap to complete the operation.
  */
 Oid
-make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
+make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp,
+             LOCKMODE lockmode)
 {
    TupleDesc   OldHeapDesc;
    char        NewHeapName[NAMEDATALEN];
@@ -626,8 +628,10 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
    HeapTuple   tuple;
    Datum       reloptions;
    bool        isNull;
+   Oid         namespaceid;
+   char        relpersistence;
 
-   OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
+   OldHeap = heap_open(OIDOldHeap, lockmode);
    OldHeapDesc = RelationGetDescr(OldHeap);
 
    /*
@@ -648,6 +652,17 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
    if (isNull)
        reloptions = (Datum) 0;
 
+   if (forcetemp)
+   {
+       namespaceid = LookupCreationNamespace("pg_temp");
+       relpersistence = RELPERSISTENCE_TEMP;
+   }
+   else
+   {
+       namespaceid = RelationGetNamespace(OldHeap);
+       relpersistence = OldHeap->rd_rel->relpersistence;
+   }
+
    /*
     * Create the new heap, using a temporary name in the same namespace as
     * the existing table.  NOTE: there is some risk of collision with user
@@ -663,7 +678,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
    snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
 
    OIDNewHeap = heap_create_with_catalog(NewHeapName,
-                                         RelationGetNamespace(OldHeap),
+                                         namespaceid,
                                          NewTableSpace,
                                          InvalidOid,
                                          InvalidOid,
@@ -671,8 +686,8 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
                                          OldHeap->rd_rel->relowner,
                                          OldHeapDesc,
                                          NIL,
-                                         OldHeap->rd_rel->relkind,
-                                         OldHeap->rd_rel->relpersistence,
+                                         RELKIND_RELATION,
+                                         relpersistence,
                                          false,
                                          RelationIsMapped(OldHeap),
                                          true,
index 1c383baf68750808320ebc5e9925b122c802a639..edd34ff171642251003b253b6d99ac383f0455af 100644 (file)
 #include "catalog/catalog.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_operator.h"
 #include "commands/cluster.h"
 #include "commands/matview.h"
 #include "commands/tablecmds.h"
+#include "commands/tablespace.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "miscadmin.h"
+#include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/typcache.h"
 
 
 typedef struct
@@ -44,12 +51,23 @@ typedef struct
    BulkInsertState bistate;    /* bulk insert state */
 } DR_transientrel;
 
+static int matview_maintenance_depth = 0;
+
 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
 static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
 static void transientrel_shutdown(DestReceiver *self);
 static void transientrel_destroy(DestReceiver *self);
 static void refresh_matview_datafill(DestReceiver *dest, Query *query,
-                        const char *queryString);
+                        const char *queryString, Oid relowner);
+
+static char *make_temptable_name_n(char *tempname, int n);
+static void mv_GenerateOper(StringInfo buf, Oid opoid);
+
+static void refresh_by_match_merge(Oid matviewOid, Oid tempOid);
+static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap);
+
+static void OpenMatViewIncrementalMaintenance(void);
+static void CloseMatViewIncrementalMaintenance(void);
 
 /*
  * SetMatViewPopulatedState
@@ -122,18 +140,21 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
    RewriteRule *rule;
    List       *actions;
    Query      *dataQuery;
-   Oid         save_userid;
-   int         save_sec_context;
-   int         save_nestlevel;
    Oid         tableSpace;
    Oid         OIDNewHeap;
    DestReceiver *dest;
+   bool        concurrent;
+   LOCKMODE    lockmode;
+
+   /* Determine strength of lock needed. */
+   concurrent = stmt->concurrent;
+   lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
 
    /*
     * Get a lock until end of transaction.
     */
    matviewOid = RangeVarGetRelidExtended(stmt->relation,
-                                         AccessExclusiveLock, false, false,
+                                         lockmode, false, false,
                                          RangeVarCallbackOwnsTable, NULL);
    matviewRel = heap_open(matviewOid, NoLock);
 
@@ -144,11 +165,22 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
                 errmsg("\"%s\" is not a materialized view",
                        RelationGetRelationName(matviewRel))));
 
-   /*
-    * We're not using materialized views in the system catalogs.
-    */
+   /* Check that CONCURRENTLY is not specified if not populated. */
+   if (concurrent && !RelationIsPopulated(matviewRel))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
+
+   /* Check that conflicting options have not been specified. */
+   if (concurrent && stmt->skipData)
+       ereport(ERROR,
+               (errcode(ERRCODE_SYNTAX_ERROR),
+                errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
+
+   /* We're not using materialized views in the system catalogs. */
    Assert(!IsSystemRelation(matviewRel));
 
+   /* We don't allow an oid column for a materialized view. */
    Assert(!matviewRel->rd_rel->relhasoids);
 
    /*
@@ -194,48 +226,49 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
     */
    CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
 
-   /*
-    * Switch to the owner's userid, so that any functions are run as that
-    * user.  Also lock down security-restricted operations and arrange to
-    * make GUC variable changes local to this command.
-    */
-   GetUserIdAndSecContext(&save_userid, &save_sec_context);
-   SetUserIdAndSecContext(matviewRel->rd_rel->relowner,
-                          save_sec_context | SECURITY_RESTRICTED_OPERATION);
-   save_nestlevel = NewGUCNestLevel();
-
    /*
     * Tentatively mark the matview as populated or not (this will roll back
     * if we fail later).
     */
    SetMatViewPopulatedState(matviewRel, !stmt->skipData);
 
-   tableSpace = matviewRel->rd_rel->reltablespace;
+   /* Concurrent refresh builds new data in temp tablespace, and does diff. */
+   if (concurrent)
+       tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
+   else
+       tableSpace = matviewRel->rd_rel->reltablespace;
 
    heap_close(matviewRel, NoLock);
 
    /* Create the transient table that will receive the regenerated data. */
-   OIDNewHeap = make_new_heap(matviewOid, tableSpace);
+   OIDNewHeap = make_new_heap(matviewOid, tableSpace, concurrent,
+                              ExclusiveLock);
    dest = CreateTransientRelDestReceiver(OIDNewHeap);
 
    /* Generate the data, if wanted. */
    if (!stmt->skipData)
-       refresh_matview_datafill(dest, dataQuery, queryString);
-
-   /*
-    * Swap the physical files of the target and transient tables, then
-    * rebuild the target's indexes and throw away the transient table.
-    */
-   finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
-                    RecentXmin, ReadNextMultiXactId());
-
-   RelationCacheInvalidateEntry(matviewOid);
-
-   /* Roll back any GUC changes */
-   AtEOXact_GUC(false, save_nestlevel);
-
-   /* Restore userid and security context */
-   SetUserIdAndSecContext(save_userid, save_sec_context);
+       refresh_matview_datafill(dest, dataQuery, queryString,
+                                matviewRel->rd_rel->relowner);
+
+   /* Make the matview match the newly generated data. */
+   if (concurrent)
+   {
+       int         old_depth = matview_maintenance_depth;
+
+       PG_TRY();
+       {
+           refresh_by_match_merge(matviewOid, OIDNewHeap);
+       }
+       PG_CATCH();
+       {
+           matview_maintenance_depth = old_depth;
+           PG_RE_THROW();
+       }
+       PG_END_TRY();
+       Assert(matview_maintenance_depth == old_depth);
+   }
+   else
+       refresh_by_heap_swap(matviewOid, OIDNewHeap);
 }
 
 /*
@@ -243,11 +276,24 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
  */
 static void
 refresh_matview_datafill(DestReceiver *dest, Query *query,
-                        const char *queryString)
+                        const char *queryString, Oid relowner)
 {
    List       *rewritten;
    PlannedStmt *plan;
    QueryDesc  *queryDesc;
+   Oid         save_userid;
+   int         save_sec_context;
+   int         save_nestlevel;
+
+   /*
+    * Switch to the owner's userid, so that any functions are run as that
+    * user.  Also lock down security-restricted operations and arrange to
+    * make GUC variable changes local to this command.
+    */
+   GetUserIdAndSecContext(&save_userid, &save_sec_context);
+   SetUserIdAndSecContext(relowner,
+                          save_sec_context | SECURITY_RESTRICTED_OPERATION);
+   save_nestlevel = NewGUCNestLevel();
 
    /* Rewrite, copying the given Query to make sure it's not changed */
    rewritten = QueryRewrite((Query *) copyObject(query));
@@ -290,6 +336,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
    FreeQueryDesc(queryDesc);
 
    PopActiveSnapshot();
+
+   /* Roll back any GUC changes */
+   AtEOXact_GUC(false, save_nestlevel);
+
+   /* Restore userid and security context */
+   SetUserIdAndSecContext(save_userid, save_sec_context);
 }
 
 DestReceiver *
@@ -388,3 +440,401 @@ transientrel_destroy(DestReceiver *self)
 {
    pfree(self);
 }
+
+
+/*
+ * Given a qualified temporary table name, append an underscore followed by
+ * the given integer, to make a new table name based on the old one.
+ *
+ * This leaks memory through palloc(), which won't be cleaned up until the
+ * current memory memory context is freed.
+ */
+static char *
+make_temptable_name_n(char *tempname, int n)
+{
+   StringInfoData namebuf;
+
+   initStringInfo(&namebuf);
+   appendStringInfoString(&namebuf, tempname);
+   appendStringInfo(&namebuf, "_%i", n);
+   return namebuf.data;
+}
+
+static void
+mv_GenerateOper(StringInfo buf, Oid opoid)
+{
+   HeapTuple   opertup;
+   Form_pg_operator operform;
+
+   opertup = SearchSysCache1(OPEROID, ObjectIdGetDatum(opoid));
+   if (!HeapTupleIsValid(opertup))
+       elog(ERROR, "cache lookup failed for operator %u", opoid);
+   operform = (Form_pg_operator) GETSTRUCT(opertup);
+   Assert(operform->oprkind == 'b');
+
+   appendStringInfo(buf, "OPERATOR(%s.%s)",
+               quote_identifier(get_namespace_name(operform->oprnamespace)),
+                    NameStr(operform->oprname));
+
+   ReleaseSysCache(opertup);
+}
+
+/*
+ * refresh_by_match_merge
+ *
+ * Refresh a materialized view with transactional semantics, while allowing
+ * concurrent reads.
+ *
+ * This is called after a new version of the data has been created in a
+ * temporary table.  It performs a full outer join against the old version of
+ * the data, producing "diff" results. This join cannot work if there are any
+ * duplicated rows in either the old or new versions, in the sense that every
+ * column would compare as equal between the two rows. It does work correctly
+ * in the face of rows which have at least one NULL value, with all non-NULL
+ * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
+ * indexes turns out to be quite convenient here; the tests we need to make
+ * are consistent with default behavior.  If there is at least one UNIQUE
+ * index on the materialized view, we have exactly the guarantee we need.  By
+ * joining based on equality on all columns which are part of any unique
+ * index, we identify the rows on which we can use UPDATE without any problem.
+ * If any column is NULL in either the old or new version of a row (or both),
+ * we must use DELETE and INSERT, since there could be multiple rows which are
+ * NOT DISTINCT FROM each other, and we could otherwise end up with the wrong
+ * number of occurrences in the updated relation.  The temporary table used to
+ * hold the diff results contains just the TID of the old record (if matched)
+ * and the ROW from the new table as a single column of complex record type
+ * (if matched).
+ *
+ * Once we have the diff table, we perform set-based DELETE, UPDATE, and
+ * INSERT operations against the materialized view, and discard both temporary
+ * tables.
+ *
+ * Everything from the generation of the new data to applying the differences
+ * takes place under cover of an ExclusiveLock, since it seems as though we
+ * would want to prohibit not only concurrent REFRESH operations, but also
+ * incremental maintenance.  It also doesn't seem reasonable or safe to allow
+ * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
+ * this command.
+ */
+static void
+refresh_by_match_merge(Oid matviewOid, Oid tempOid)
+{
+   StringInfoData querybuf;
+   Relation    matviewRel;
+   Relation    tempRel;
+   char       *matviewname;
+   char       *tempname;
+   char       *diffname;
+   TupleDesc   tupdesc;
+   bool        foundUniqueIndex;
+   List       *indexoidlist;
+   ListCell   *indexoidscan;
+   int16       relnatts;
+   bool       *usedForQual;
+   Oid         save_userid;
+   int         save_sec_context;
+   int         save_nestlevel;
+
+   initStringInfo(&querybuf);
+   matviewRel = heap_open(matviewOid, NoLock);
+   matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+                                       RelationGetRelationName(matviewRel));
+   tempRel = heap_open(tempOid, NoLock);
+   tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
+                                         RelationGetRelationName(tempRel));
+   diffname = make_temptable_name_n(tempname, 2);
+
+   relnatts = matviewRel->rd_rel->relnatts;
+   usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
+
+   /* Open SPI context. */
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "SPI_connect failed");
+
+   /* Analyze the temp table with the new contents. */
+   appendStringInfo(&querybuf, "ANALYZE %s", tempname);
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   /*
+    * We need to ensure that there are not duplicate rows without NULLs in
+    * the new data set before we can count on the "diff" results.  Check for
+    * that in a way that allows showing the first duplicated row found.  Even
+    * after we pass this test, a unique index on the materialized view may
+    * find a duplicate key problem.
+    */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf,
+                    "SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS "
+                    "(SELECT * FROM %s y WHERE y IS NOT NULL "
+                    "AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1",
+                    tempname, tempname);
+   if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+   if (SPI_processed > 0)
+   {
+       ereport(ERROR,
+               (errcode(ERRCODE_CARDINALITY_VIOLATION),
+                errmsg("new data for \"%s\" contains duplicate rows without any NULL columns",
+                       RelationGetRelationName(matviewRel)),
+                errdetail("Row: %s",
+           SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
+   }
+
+   /* Start building the query for creating the diff table. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf,
+                    "CREATE TEMP TABLE %s AS "
+                    "SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON (",
+                    diffname, matviewname, tempname);
+
+   /*
+    * Get the list of index OIDs for the table from the relcache, and look up
+    * each one in the pg_index syscache.  We will test for equality on all
+    * columns present in all unique indexes which only reference columns and
+    * include all rows.
+    */
+   tupdesc = matviewRel->rd_att;
+   foundUniqueIndex = false;
+   indexoidlist = RelationGetIndexList(matviewRel);
+
+   foreach(indexoidscan, indexoidlist)
+   {
+       Oid         indexoid = lfirst_oid(indexoidscan);
+       HeapTuple   indexTuple;
+       Form_pg_index index;
+
+       indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+       if (!HeapTupleIsValid(indexTuple))      /* should not happen */
+           elog(ERROR, "cache lookup failed for index %u", indexoid);
+       index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+       /* We're only interested if it is unique and valid. */
+       if (index->indisunique && IndexIsValid(index))
+       {
+           int         numatts = index->indnatts;
+           int         i;
+           bool        expr = false;
+           Relation    indexRel;
+
+           /* Skip any index on an expression. */
+           for (i = 0; i < numatts; i++)
+           {
+               if (index->indkey.values[i] == 0)
+               {
+                   expr = true;
+                   break;
+               }
+           }
+           if (expr)
+           {
+               ReleaseSysCache(indexTuple);
+               continue;
+           }
+
+           /* Skip partial indexes. */
+           indexRel = index_open(index->indexrelid, RowExclusiveLock);
+           if (indexRel->rd_indpred != NIL)
+           {
+               index_close(indexRel, NoLock);
+               ReleaseSysCache(indexTuple);
+               continue;
+           }
+           /* Hold the locks, since we're about to run DML which needs them. */
+           index_close(indexRel, NoLock);
+
+           /* Add quals for all columns from this index. */
+           for (i = 0; i < numatts; i++)
+           {
+               int         attnum = index->indkey.values[i];
+               Oid         type;
+               Oid         op;
+               const char *colname;
+
+               /*
+                * Only include the column once regardless of how many times
+                * it shows up in how many indexes.
+                *
+                * This is also useful later to omit columns which can not
+                * have changed from the SET clause of the UPDATE statement.
+                */
+               if (usedForQual[attnum - 1])
+                   continue;
+               usedForQual[attnum - 1] = true;
+
+               /*
+                * Actually add the qual, ANDed with any others.
+                */
+               if (foundUniqueIndex)
+                   appendStringInfoString(&querybuf, " AND ");
+
+               colname = quote_identifier(NameStr((tupdesc->attrs[attnum - 1])->attname));
+               appendStringInfo(&querybuf, "y.%s ", colname);
+               type = attnumTypeId(matviewRel, attnum);
+               op = lookup_type_cache(type, TYPECACHE_EQ_OPR)->eq_opr;
+               mv_GenerateOper(&querybuf, op);
+               appendStringInfo(&querybuf, " x.%s", colname);
+
+               foundUniqueIndex = true;
+           }
+       }
+       ReleaseSysCache(indexTuple);
+   }
+
+   list_free(indexoidlist);
+
+   if (!foundUniqueIndex)
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+              errmsg("cannot refresh materialized view \"%s\" concurrently",
+                     matviewname),
+                errhint("Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view.")));
+
+   appendStringInfoString(&querybuf,
+                          " AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*)"
+                          " ORDER BY tid");
+
+   /* Create the temporary "diff" table. */
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   /*
+    * We have no further use for data from the "full-data" temp table, but we
+    * must keep it around because its type is reference from the diff table.
+    */
+
+   /* Analyze the diff table. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf, "ANALYZE %s", diffname);
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   OpenMatViewIncrementalMaintenance();
+
+   /*
+    * Switch to the owner's userid, so that any functions are run as that
+    * user.  Also lock down security-restricted operations and arrange to
+    * make GUC variable changes local to this command.
+    */
+   GetUserIdAndSecContext(&save_userid, &save_sec_context);
+   SetUserIdAndSecContext(matviewRel->rd_rel->relowner,
+                          save_sec_context | SECURITY_RESTRICTED_OPERATION);
+   save_nestlevel = NewGUCNestLevel();
+
+   /* Deletes must come before inserts; do them first. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf,
+                    "DELETE FROM %s WHERE ctid IN "
+                    "(SELECT d.tid FROM %s d "
+                    "WHERE d.tid IS NOT NULL "
+                    "AND (d.y) IS NOT DISTINCT FROM NULL)",
+                    matviewname, diffname);
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   /* Updates before inserts gives a better chance at HOT updates. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf, "UPDATE %s x SET ", matviewname);
+
+   {
+       int         i;
+       bool        targetColFound = false;
+
+       for (i = 0; i < tupdesc->natts; i++)
+       {
+           const char *colname;
+
+           if (tupdesc->attrs[i]->attisdropped)
+               continue;
+
+           if (usedForQual[i])
+               continue;
+
+           if (targetColFound)
+               appendStringInfoString(&querybuf, ", ");
+           targetColFound = true;
+
+           colname = quote_identifier(NameStr((tupdesc->attrs[i])->attname));
+           appendStringInfo(&querybuf, "%s = (d.y).%s", colname, colname);
+       }
+
+       if (targetColFound)
+       {
+           appendStringInfo(&querybuf,
+                            " FROM %s d "
+                            "WHERE d.tid IS NOT NULL AND x.ctid = d.tid",
+                            diffname);
+
+           if (SPI_exec(querybuf.data, 0) != SPI_OK_UPDATE)
+               elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+       }
+   }
+
+   /* Inserts go last. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf,
+                    "INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL",
+                    matviewname, diffname);
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   /* Roll back any GUC changes */
+   AtEOXact_GUC(false, save_nestlevel);
+
+   /* Restore userid and security context */
+   SetUserIdAndSecContext(save_userid, save_sec_context);
+
+   /* We're done maintaining the materialized view. */
+   CloseMatViewIncrementalMaintenance();
+   heap_close(tempRel, NoLock);
+   heap_close(matviewRel, NoLock);
+
+   /* Clean up temp tables. */
+   resetStringInfo(&querybuf);
+   appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
+   if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
+       elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+   /* Close SPI context. */
+   if (SPI_finish() != SPI_OK_FINISH)
+       elog(ERROR, "SPI_finish failed");
+}
+
+/*
+ * Swap the physical files of the target and transient tables, then rebuild
+ * the target's indexes and throw away the transient table.  Security context
+ * swapping is handled by the called function, so it is not needed here.
+ */
+static void
+refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap)
+{
+   finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
+                    RecentXmin, ReadNextMultiXactId());
+
+   RelationCacheInvalidateEntry(matviewOid);
+}
+
+static void
+OpenMatViewIncrementalMaintenance(void)
+{
+   matview_maintenance_depth++;
+}
+
+static void
+CloseMatViewIncrementalMaintenance(void)
+{
+   matview_maintenance_depth--;
+   Assert(matview_maintenance_depth >= 0);
+}
+
+/*
+ * This should be used to test whether the backend is in a context where it is
+ * OK to allow DML statements to modify materialized views.  We only want to
+ * allow that for internal code driven by the materialized view definition,
+ * not for arbitrary user-supplied code.
+ */
+bool
+MatViewIncrementalMaintenanceIsEnabled(void)
+{
+   return matview_maintenance_depth > 0;
+}
index f56ef28e229a8cc7ada9e69cda3b60a6f85fdf59..bd0a21987c83b965ee4f2a3f0861e4dd7e791e3f 100644 (file)
@@ -3541,7 +3541,8 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
            heap_close(OldHeap, NoLock);
 
            /* Create transient table that will receive the modified data */
-           OIDNewHeap = make_new_heap(tab->relid, NewTableSpace);
+           OIDNewHeap = make_new_heap(tab->relid, NewTableSpace, false,
+                                      AccessExclusiveLock);
 
            /*
             * Copy the heap data into the new table with the desired
index 3b664d09265e1565e7270254cafdf7affde1edf1..4d7345da577ec82ced8d5973fb7ce0dfa35f0145 100644 (file)
@@ -42,6 +42,7 @@
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "commands/matview.h"
 #include "commands/trigger.h"
 #include "executor/execdebug.h"
 #include "foreign/fdwapi.h"
@@ -999,10 +1000,11 @@ CheckValidResultRel(Relation resultRel, CmdType operation)
            }
            break;
        case RELKIND_MATVIEW:
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("cannot change materialized view \"%s\"",
-                           RelationGetRelationName(resultRel))));
+           if (!MatViewIncrementalMaintenanceIsEnabled())
+               ereport(ERROR,
+                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                        errmsg("cannot change materialized view \"%s\"",
+                               RelationGetRelationName(resultRel))));
            break;
        case RELKIND_FOREIGN_TABLE:
            /* Okay only if the FDW supports it */
index e934c7b9ab9dc694315c95a5fe8d13896bd8a44a..8fe5f1d427a8a9d3afc5741a3c8daea2a05857d4 100644 (file)
@@ -950,7 +950,7 @@ ExecModifyTable(ModifyTableState *node)
                bool        isNull;
 
                relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
-               if (relkind == RELKIND_RELATION)
+               if (relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW)
                {
                    datum = ExecGetJunkAttribute(slot,
                                                 junkfilter->jf_junkAttNo,
@@ -1280,7 +1280,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
                    char        relkind;
 
                    relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind;
-                   if (relkind == RELKIND_RELATION)
+                   if (relkind == RELKIND_RELATION ||
+                       relkind == RELKIND_MATVIEW)
                    {
                        j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid");
                        if (!AttributeNumberIsValid(j->jf_junkAttNo))
index b5b8d63cff79247ed1340b228fd5a10887a5a1bf..ad7378dd9356ded16f73e242844d0a43f3bc48ff 100644 (file)
@@ -3241,6 +3241,7 @@ _copyRefreshMatViewStmt(const RefreshMatViewStmt *from)
 {
    RefreshMatViewStmt *newnode = makeNode(RefreshMatViewStmt);
 
+   COPY_SCALAR_FIELD(concurrent);
    COPY_SCALAR_FIELD(skipData);
    COPY_NODE_FIELD(relation);
 
index 3f96595e8eb041bcebe38639e075db7eb8a77fd2..e0d4bca809bd1a719a602e676ca3fe2528ad49ef 100644 (file)
@@ -1521,6 +1521,7 @@ _equalCreateTableAsStmt(const CreateTableAsStmt *a, const CreateTableAsStmt *b)
 static bool
 _equalRefreshMatViewStmt(const RefreshMatViewStmt *a, const RefreshMatViewStmt *b)
 {
+   COMPARE_SCALAR_FIELD(concurrent);
    COMPARE_SCALAR_FIELD(skipData);
    COMPARE_NODE_FIELD(relation);
 
index f67ef0c9caf206ad9376f232233896be1993bf8d..5e9b3eda92410d556f540aa304d95e51a9d5d19a 100644 (file)
@@ -3301,11 +3301,12 @@ OptNoLog:   UNLOGGED                    { $$ = RELPERSISTENCE_UNLOGGED; }
  *****************************************************************************/
 
 RefreshMatViewStmt:
-           REFRESH MATERIALIZED VIEW qualified_name opt_with_data
+           REFRESH MATERIALIZED VIEW opt_concurrently qualified_name opt_with_data
                {
                    RefreshMatViewStmt *n = makeNode(RefreshMatViewStmt);
-                   n->relation = $4;
-                   n->skipData = !($5);
+                   n->concurrent = $4;
+                   n->relation = $5;
+                   n->skipData = !($6);
                    $$ = (Node *) n;
                }
        ;
index a67d2cdd909c0afe3e62518301a2634f24ab16ce..5b7fc93eb264cf47db08283850c8472b92c857f9 100644 (file)
@@ -2871,11 +2871,22 @@ psql_completion(char *text, int start, int end)
    else if (pg_strcasecmp(prev3_wd, "REFRESH") == 0 &&
             pg_strcasecmp(prev2_wd, "MATERIALIZED") == 0 &&
             pg_strcasecmp(prev_wd, "VIEW") == 0)
+       COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_matviews,
+                                  " UNION SELECT 'CONCURRENTLY'");
+   else if (pg_strcasecmp(prev4_wd, "REFRESH") == 0 &&
+            pg_strcasecmp(prev3_wd, "MATERIALIZED") == 0 &&
+            pg_strcasecmp(prev2_wd, "VIEW") == 0 &&
+            pg_strcasecmp(prev_wd, "CONCURRENTLY") == 0)
        COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_matviews, NULL);
    else if (pg_strcasecmp(prev4_wd, "REFRESH") == 0 &&
             pg_strcasecmp(prev3_wd, "MATERIALIZED") == 0 &&
             pg_strcasecmp(prev2_wd, "VIEW") == 0)
        COMPLETE_WITH_CONST("WITH");
+   else if (pg_strcasecmp(prev5_wd, "REFRESH") == 0 &&
+            pg_strcasecmp(prev4_wd, "MATERIALIZED") == 0 &&
+            pg_strcasecmp(prev3_wd, "VIEW") == 0 &&
+            pg_strcasecmp(prev2_wd, "CONCURRENTLY") == 0)
+       COMPLETE_WITH_CONST("WITH DATA");
    else if (pg_strcasecmp(prev5_wd, "REFRESH") == 0 &&
             pg_strcasecmp(prev4_wd, "MATERIALIZED") == 0 &&
             pg_strcasecmp(prev3_wd, "VIEW") == 0 &&
@@ -2886,6 +2897,12 @@ psql_completion(char *text, int start, int end)
 
        COMPLETE_WITH_LIST(list_WITH_DATA);
    }
+   else if (pg_strcasecmp(prev6_wd, "REFRESH") == 0 &&
+            pg_strcasecmp(prev5_wd, "MATERIALIZED") == 0 &&
+            pg_strcasecmp(prev4_wd, "VIEW") == 0 &&
+            pg_strcasecmp(prev3_wd, "CONCURRENTLY") == 0 &&
+            pg_strcasecmp(prev_wd, "WITH") == 0)
+       COMPLETE_WITH_CONST("DATA");
    else if (pg_strcasecmp(prev6_wd, "REFRESH") == 0 &&
             pg_strcasecmp(prev5_wd, "MATERIALIZED") == 0 &&
             pg_strcasecmp(prev4_wd, "VIEW") == 0 &&
index aa6d332e9d2e7e2b90fd4fd0e9c840c6f80a446d..03d9f176c57189a3428b83584143475105f76be8 100644 (file)
@@ -25,7 +25,8 @@ extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
                           bool recheck, LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
 
-extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace);
+extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, bool forcetemp,
+             LOCKMODE lockmode);
 extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
                 bool is_system_catalog,
                 bool swap_toast_by_content,
index dce724469e2377e65e57055c2bc9061da3271356..29229076a3dc0631b0d62aa45261e3d584f883a6 100644 (file)
@@ -27,4 +27,6 @@ extern void ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString
 
 extern DestReceiver *CreateTransientRelDestReceiver(Oid oid);
 
+extern bool MatViewIncrementalMaintenanceIsEnabled(void);
+
 #endif   /* MATVIEW_H */
index de22dff89359ea4141ad915ee008b0a971eb7975..bf81b5bddad7e1f7c5684d8336d4558179921a1a 100644 (file)
@@ -2478,6 +2478,7 @@ typedef struct CreateTableAsStmt
 typedef struct RefreshMatViewStmt
 {
    NodeTag     type;
+   bool        concurrent;     /* allow concurrent access? */
    bool        skipData;       /* true for WITH NO DATA */
    RangeVar   *relation;       /* relation to insert into */
 } RefreshMatViewStmt;
index a98de4f58d332d93e2088be5716d817e4f5ac713..5a31fda69fcaee2996eaf11a0fe7c13e7063a3bb 100644 (file)
@@ -73,6 +73,8 @@ SELECT * FROM tvm;
 
 CREATE MATERIALIZED VIEW tmm AS SELECT sum(totamt) AS grandtot FROM tm;
 CREATE MATERIALIZED VIEW tvmm AS SELECT sum(totamt) AS grandtot FROM tvm;
+CREATE UNIQUE INDEX tvmm_expr ON tvmm ((grandtot > 0));
+CREATE UNIQUE INDEX tvmm_pred ON tvmm (grandtot) WHERE grandtot < 0;
 CREATE VIEW tvv AS SELECT sum(totamt) AS grandtot FROM tv;
 EXPLAIN (costs off)
   CREATE MATERIALIZED VIEW tvvm AS SELECT * FROM tvv;
@@ -141,6 +143,9 @@ ALTER MATERIALIZED VIEW tvm SET SCHEMA mvschema;
   Column  |  Type   | Modifiers | Storage | Stats target | Description 
 ----------+---------+-----------+---------+--------------+-------------
  grandtot | numeric |           | main    |              | 
+Indexes:
+    "tvmm_expr" UNIQUE, btree ((grandtot > 0::numeric))
+    "tvmm_pred" UNIQUE, btree (grandtot) WHERE grandtot < 0::numeric
 View definition:
  SELECT sum(tvm.totamt) AS grandtot
    FROM mvschema.tvm;
@@ -177,7 +182,7 @@ SELECT * FROM tvm ORDER BY type;
  z    |     11
 (3 rows)
 
-REFRESH MATERIALIZED VIEW tm;
+REFRESH MATERIALIZED VIEW CONCURRENTLY tm;
 REFRESH MATERIALIZED VIEW tvm;
 SELECT * FROM tm ORDER BY type;
  type | totamt 
@@ -237,6 +242,9 @@ SELECT * FROM tvvm;
 (1 row)
 
 REFRESH MATERIALIZED VIEW tmm;
+REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm;
+ERROR:  cannot refresh materialized view "public.tvmm" concurrently
+HINT:  Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view.
 REFRESH MATERIALIZED VIEW tvmm;
 REFRESH MATERIALIZED VIEW tvvm;
 EXPLAIN (costs off)
@@ -281,6 +289,9 @@ SELECT * FROM tvvm;
 -- test diemv when the mv does not exist
 DROP MATERIALIZED VIEW IF EXISTS no_such_mv;
 NOTICE:  materialized view "no_such_mv" does not exist, skipping
+-- make sure invalid comination of options is prohibited
+REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm WITH NO DATA;
+ERROR:  CONCURRENTLY and WITH NO DATA options cannot be used together
 -- test join of mv and view
 SELECT type, m.totamt AS mtot, v.totamt AS vtot FROM tm m LEFT JOIN tv v USING (type) ORDER BY type;
  type | mtot | vtot 
@@ -385,3 +396,28 @@ SELECT * FROM hogeview WHERE i < 10;
 
 DROP TABLE hoge CASCADE;
 NOTICE:  drop cascades to materialized view hogeview
+-- test that duplicate values on unique index prevent refresh
+CREATE TABLE foo(a, b) AS VALUES(1, 10);
+CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo;
+CREATE UNIQUE INDEX ON mv(a);
+INSERT INTO foo SELECT * FROM foo;
+REFRESH MATERIALIZED VIEW mv;
+ERROR:  could not create unique index "mv_a_idx"
+DETAIL:  Key (a)=(1) is duplicated.
+REFRESH MATERIALIZED VIEW CONCURRENTLY mv;
+ERROR:  new data for "mv" contains duplicate rows without any NULL columns
+DETAIL:  Row: (1,10)
+DROP TABLE foo CASCADE;
+NOTICE:  drop cascades to materialized view mv
+-- make sure that all indexes covered by unique indexes works
+CREATE TABLE foo(a, b, c) AS VALUES(1, 2, 3);
+CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo;
+CREATE UNIQUE INDEX ON mv (a);
+CREATE UNIQUE INDEX ON mv (b);
+CREATE UNIQUE INDEX on mv (c);
+INSERT INTO foo VALUES(2, 3, 4);
+INSERT INTO foo VALUES(3, 4, 5);
+REFRESH MATERIALIZED VIEW mv;
+REFRESH MATERIALIZED VIEW CONCURRENTLY mv;
+DROP TABLE foo CASCADE;
+NOTICE:  drop cascades to materialized view mv
index 975f8dd57506e533ffd01364b57f17a830dd2304..9d60bbbbe4db663c23e6d42a43c8e16d64889a1c 100644 (file)
@@ -29,6 +29,8 @@ CREATE MATERIALIZED VIEW tvm AS SELECT * FROM tv ORDER BY type;
 SELECT * FROM tvm;
 CREATE MATERIALIZED VIEW tmm AS SELECT sum(totamt) AS grandtot FROM tm;
 CREATE MATERIALIZED VIEW tvmm AS SELECT sum(totamt) AS grandtot FROM tvm;
+CREATE UNIQUE INDEX tvmm_expr ON tvmm ((grandtot > 0));
+CREATE UNIQUE INDEX tvmm_pred ON tvmm (grandtot) WHERE grandtot < 0;
 CREATE VIEW tvv AS SELECT sum(totamt) AS grandtot FROM tv;
 EXPLAIN (costs off)
   CREATE MATERIALIZED VIEW tvvm AS SELECT * FROM tvv;
@@ -57,7 +59,7 @@ INSERT INTO t VALUES (6, 'z', 13);
 -- confirm pre- and post-refresh contents of fairly simple materialized views
 SELECT * FROM tm ORDER BY type;
 SELECT * FROM tvm ORDER BY type;
-REFRESH MATERIALIZED VIEW tm;
+REFRESH MATERIALIZED VIEW CONCURRENTLY tm;
 REFRESH MATERIALIZED VIEW tvm;
 SELECT * FROM tm ORDER BY type;
 SELECT * FROM tvm ORDER BY type;
@@ -74,6 +76,7 @@ SELECT * FROM tmm;
 SELECT * FROM tvmm;
 SELECT * FROM tvvm;
 REFRESH MATERIALIZED VIEW tmm;
+REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm;
 REFRESH MATERIALIZED VIEW tvmm;
 REFRESH MATERIALIZED VIEW tvvm;
 EXPLAIN (costs off)
@@ -89,6 +92,9 @@ SELECT * FROM tvvm;
 -- test diemv when the mv does not exist
 DROP MATERIALIZED VIEW IF EXISTS no_such_mv;
 
+-- make sure invalid comination of options is prohibited
+REFRESH MATERIALIZED VIEW CONCURRENTLY tvmm WITH NO DATA;
+
 -- test join of mv and view
 SELECT type, m.totamt AS mtot, v.totamt AS vtot FROM tm m LEFT JOIN tv v USING (type) ORDER BY type;
 
@@ -124,3 +130,24 @@ SELECT * FROM hogeview WHERE i < 10;
 VACUUM ANALYZE;
 SELECT * FROM hogeview WHERE i < 10;
 DROP TABLE hoge CASCADE;
+
+-- test that duplicate values on unique index prevent refresh
+CREATE TABLE foo(a, b) AS VALUES(1, 10);
+CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo;
+CREATE UNIQUE INDEX ON mv(a);
+INSERT INTO foo SELECT * FROM foo;
+REFRESH MATERIALIZED VIEW mv;
+REFRESH MATERIALIZED VIEW CONCURRENTLY mv;
+DROP TABLE foo CASCADE;
+
+-- make sure that all indexes covered by unique indexes works
+CREATE TABLE foo(a, b, c) AS VALUES(1, 2, 3);
+CREATE MATERIALIZED VIEW mv AS SELECT * FROM foo;
+CREATE UNIQUE INDEX ON mv (a);
+CREATE UNIQUE INDEX ON mv (b);
+CREATE UNIQUE INDEX on mv (c);
+INSERT INTO foo VALUES(2, 3, 4);
+INSERT INTO foo VALUES(3, 4, 5);
+REFRESH MATERIALIZED VIEW mv;
+REFRESH MATERIALIZED VIEW CONCURRENTLY mv;
+DROP TABLE foo CASCADE;