Support "COPY view FROM" for views with INSTEAD OF INSERT triggers.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 10 Nov 2016 19:13:43 +0000 (14:13 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 10 Nov 2016 19:13:43 +0000 (14:13 -0500)
We just pass the data to the INSTEAD trigger.

Haribabu Kommi, reviewed by Dilip Kumar

Patch: <CAJrrPGcSQkrNkO+4PhLm4B8UQQQmU9YVUuqmtgM=pmzMfxWaWQ@mail.gmail.com>

doc/src/sgml/ref/copy.sgml
src/backend/commands/copy.c
src/test/regress/expected/copy2.out
src/test/regress/sql/copy2.sql

index 07e2f45196f054a7be134b1b8b4b19089086de02..2477a872e88558d9f8ebcc7eabdf9c0280ae7e04 100644 (file)
@@ -395,9 +395,15 @@ COPY <replaceable class="parameter">count</replaceable>
   <title>Notes</title>
 
    <para>
-    <command>COPY</command> can only be used with plain tables, not
+    <command>COPY TO</command> can only be used with plain tables, not
     with views.  However, you can write <literal>COPY (SELECT * FROM
-    <replaceable class="parameter">viewname</replaceable>) TO ...</literal>.
+    <replaceable class="parameter">viewname</replaceable>) TO ...</literal>
+    to copy the current contents of a view.
+   </para>
+
+   <para>
+    <command>COPY FROM</command> can be used with plain tables and with views
+    that have <literal>INSTEAD OF INSERT</> triggers.
    </para>
 
    <para>
index b4140eb68a731b876cb032a3366430017581e87b..3c81906232842619df981763074de6eaa825412a 100644 (file)
@@ -864,8 +864,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, uint64 *processed)
                         * statement.
                         *
                         * In the case that columns are specified in the attribute list,
-                        * create a ColumnRef and ResTarget for each column and add them to
-                        * the target list for the resulting SELECT statement.
+                        * create a ColumnRef and ResTarget for each column and add them
+                        * to the target list for the resulting SELECT statement.
                         */
                        if (!stmt->attlist)
                        {
@@ -2269,13 +2269,21 @@ CopyFrom(CopyState cstate)
 
        Assert(cstate->rel);
 
-       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
+       /*
+        * The target must be a plain relation or have an INSTEAD OF INSERT row
+        * trigger.  (Currently, such triggers are only allowed on views, so we
+        * only hint about them in the view case.)
+        */
+       if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+               !(cstate->rel->trigdesc &&
+                 cstate->rel->trigdesc->trig_insert_instead_row))
        {
                if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("cannot copy to view \"%s\"",
-                                                       RelationGetRelationName(cstate->rel))));
+                                                       RelationGetRelationName(cstate->rel)),
+                                        errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
                else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -2496,52 +2504,64 @@ CopyFrom(CopyState cstate)
 
                if (!skip_tuple)
                {
-                       /* Check the constraints of the tuple */
-                       if (cstate->rel->rd_att->constr)
-                               ExecConstraints(resultRelInfo, slot, estate);
-
-                       if (useHeapMultiInsert)
+                       if (resultRelInfo->ri_TrigDesc &&
+                               resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
                        {
-                               /* Add this tuple to the tuple buffer */
-                               if (nBufferedTuples == 0)
-                                       firstBufferedLineNo = cstate->cur_lineno;
-                               bufferedTuples[nBufferedTuples++] = tuple;
-                               bufferedTuplesSize += tuple->t_len;
-
-                               /*
-                                * If the buffer filled up, flush it. Also flush if the total
-                                * size of all the tuples in the buffer becomes large, to
-                                * avoid using large amounts of memory for the buffers when
-                                * the tuples are exceptionally wide.
-                                */
-                               if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-                                       bufferedTuplesSize > 65535)
-                               {
-                                       CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-                                                                               resultRelInfo, myslot, bistate,
-                                                                               nBufferedTuples, bufferedTuples,
-                                                                               firstBufferedLineNo);
-                                       nBufferedTuples = 0;
-                                       bufferedTuplesSize = 0;
-                               }
+                               /* Pass the data to the INSTEAD ROW INSERT trigger */
+                               ExecIRInsertTriggers(estate, resultRelInfo, slot);
                        }
                        else
                        {
-                               List       *recheckIndexes = NIL;
+                               /* Check the constraints of the tuple */
+                               if (cstate->rel->rd_att->constr)
+                                       ExecConstraints(resultRelInfo, slot, estate);
+
+                               if (useHeapMultiInsert)
+                               {
+                                       /* Add this tuple to the tuple buffer */
+                                       if (nBufferedTuples == 0)
+                                               firstBufferedLineNo = cstate->cur_lineno;
+                                       bufferedTuples[nBufferedTuples++] = tuple;
+                                       bufferedTuplesSize += tuple->t_len;
+
+                                       /*
+                                        * If the buffer filled up, flush it.  Also flush if the
+                                        * total size of all the tuples in the buffer becomes
+                                        * large, to avoid using large amounts of memory for the
+                                        * buffer when the tuples are exceptionally wide.
+                                        */
+                                       if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+                                               bufferedTuplesSize > 65535)
+                                       {
+                                               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                                                       resultRelInfo, myslot, bistate,
+                                                                                       nBufferedTuples, bufferedTuples,
+                                                                                       firstBufferedLineNo);
+                                               nBufferedTuples = 0;
+                                               bufferedTuplesSize = 0;
+                                       }
+                               }
+                               else
+                               {
+                                       List       *recheckIndexes = NIL;
 
-                               /* OK, store the tuple and create index entries for it */
-                               heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+                                       /* OK, store the tuple and create index entries for it */
+                                       heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
 
-                               if (resultRelInfo->ri_NumIndices > 0)
-                                       recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                                                                                estate, false, NULL,
-                                                                                                                  NIL);
+                                       if (resultRelInfo->ri_NumIndices > 0)
+                                               recheckIndexes = ExecInsertIndexTuples(slot,
+                                                                                                                       &(tuple->t_self),
+                                                                                                                          estate,
+                                                                                                                          false,
+                                                                                                                          NULL,
+                                                                                                                          NIL);
 
-                               /* AFTER ROW INSERT Triggers */
-                               ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                                                        recheckIndexes);
+                                       /* AFTER ROW INSERT Triggers */
+                                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                                                                recheckIndexes);
 
-                               list_free(recheckIndexes);
+                                       list_free(recheckIndexes);
+                               }
                        }
 
                        /*
index 9a8922df2d48d2001aaab1a5b2a8699c4503b2bb..65e9c626b3597c9afeaaf733d3e9a4636991f2de 100644 (file)
@@ -535,6 +535,29 @@ COPY rls_t1 (a, b) TO stdout;
 2      3
 4      1
 RESET SESSION AUTHORIZATION;
+-- test with INSTEAD OF INSERT trigger on a view
+CREATE TABLE instead_of_insert_tbl(id serial, name text);
+CREATE VIEW instead_of_insert_tbl_view AS SELECT ''::text AS str;
+COPY instead_of_insert_tbl_view FROM stdin; -- fail
+ERROR:  cannot copy to view "instead_of_insert_tbl_view"
+HINT:  To enable copying to a view, provide an INSTEAD OF INSERT trigger.
+CREATE FUNCTION fun_instead_of_insert_tbl() RETURNS trigger AS $$
+BEGIN
+  INSERT INTO instead_of_insert_tbl (name) VALUES (NEW.str);
+  RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of_insert_tbl_view
+  INSTEAD OF INSERT ON instead_of_insert_tbl_view
+  FOR EACH ROW EXECUTE PROCEDURE fun_instead_of_insert_tbl();
+COPY instead_of_insert_tbl_view FROM stdin;
+SELECT * FROM instead_of_insert_tbl;
+ id | name  
+----+-------
+  1 | test1
+(1 row)
+
+-- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
 DROP FUNCTION truncate_in_subxact();
@@ -544,3 +567,6 @@ DROP ROLE regress_rls_copy_user;
 DROP ROLE regress_rls_copy_user_colperms;
 DROP FUNCTION fn_x_before();
 DROP FUNCTION fn_x_after();
+DROP TABLE instead_of_insert_tbl;
+DROP VIEW instead_of_insert_tbl_view;
+DROP FUNCTION fun_instead_of_insert_tbl();
index 89d0a39eb9d9a25ba7fb3aedd0134e3d6cff4d21..f3a6d228fae0ae6827d288f267340197204285f1 100644 (file)
@@ -387,6 +387,32 @@ COPY rls_t1 (a, b) TO stdout;
 
 RESET SESSION AUTHORIZATION;
 
+-- test with INSTEAD OF INSERT trigger on a view
+CREATE TABLE instead_of_insert_tbl(id serial, name text);
+CREATE VIEW instead_of_insert_tbl_view AS SELECT ''::text AS str;
+
+COPY instead_of_insert_tbl_view FROM stdin; -- fail
+test1
+\.
+
+CREATE FUNCTION fun_instead_of_insert_tbl() RETURNS trigger AS $$
+BEGIN
+  INSERT INTO instead_of_insert_tbl (name) VALUES (NEW.str);
+  RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of_insert_tbl_view
+  INSTEAD OF INSERT ON instead_of_insert_tbl_view
+  FOR EACH ROW EXECUTE PROCEDURE fun_instead_of_insert_tbl();
+
+COPY instead_of_insert_tbl_view FROM stdin;
+test1
+\.
+
+SELECT * FROM instead_of_insert_tbl;
+
+
+-- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
 DROP FUNCTION truncate_in_subxact();
@@ -396,3 +422,6 @@ DROP ROLE regress_rls_copy_user;
 DROP ROLE regress_rls_copy_user_colperms;
 DROP FUNCTION fn_x_before();
 DROP FUNCTION fn_x_after();
+DROP TABLE instead_of_insert_tbl;
+DROP VIEW instead_of_insert_tbl_view;
+DROP FUNCTION fun_instead_of_insert_tbl();