Fix two issues with HEADER MATCH in COPY
authorMichael Paquier <michael@paquier.xyz>
Thu, 23 Jun 2022 01:49:20 +0000 (10:49 +0900)
committerMichael Paquier <michael@paquier.xyz>
Thu, 23 Jun 2022 01:49:20 +0000 (10:49 +0900)
072132f0 used the attnum offset to access the raw_fields array when
checking that the attribute names of the header and of the relation
match, leading to incorrect results or even crashes if the attribute
numbers of a relation are changed, like on a dropped attribute.  This
fixes the logic to use the correct attribute names for the header
matching requirements.

Also, this commit disallows HEADER MATCH in COPY TO as there is no
validation that can be done in this case.

The tests are expanded for HEADER MATCH with COPY FROM and dropped
columns, with cases where a relation has a dropped and re-added column,
as well as a reduced set of columns.

Author: Julien Rouhaud
Reviewed-by: Peter Eisentraut, Michael Paquier
Discussion: https://postgr.es/m/20220607154744.vvmitnqhyxrne5ms@jrouhaud

doc/src/sgml/ref/copy.sgml
src/backend/commands/copy.c
src/backend/commands/copyfromparse.c
src/test/regress/expected/copy.out
src/test/regress/sql/copy.sql

index 40af423ccf1736a27d47bb213f0ecb2de1e8f017..8aae711b3b9ef407ed7ace7b17a6dff1fd34e8e1 100644 (file)
@@ -282,6 +282,8 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
       of the columns in the header line must match the actual column names of
       the table, otherwise an error is raised.
       This option is not allowed when using <literal>binary</literal> format.
+      The <literal>MATCH</literal> option is only valid for <command>COPY
+      FROM</command> commands.
      </para>
     </listitem>
    </varlistentry>
index f448d39c7edc401a14dd284b96d777946ae21812..e2870e3c11cda79651e6aeca625d433f3eac9a85 100644 (file)
@@ -318,7 +318,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
  * defGetBoolean() but also accepts the special value "match".
  */
 static CopyHeaderChoice
-defGetCopyHeaderChoice(DefElem *def)
+defGetCopyHeaderChoice(DefElem *def, bool is_from)
 {
        /*
         * If no parameter given, assume "true" is meant.
@@ -360,7 +360,14 @@ defGetCopyHeaderChoice(DefElem *def)
                                if (pg_strcasecmp(sval, "off") == 0)
                                        return COPY_HEADER_FALSE;
                                if (pg_strcasecmp(sval, "match") == 0)
+                               {
+                                       if (!is_from)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                                errmsg("cannot use \"%s\" with HEADER in COPY TO",
+                                                                               sval)));
                                        return COPY_HEADER_MATCH;
+                               }
                        }
                        break;
        }
@@ -452,7 +459,7 @@ ProcessCopyOptions(ParseState *pstate,
                        if (header_specified)
                                errorConflictingDefElem(defel, pstate);
                        header_specified = true;
-                       opts_out->header_line = defGetCopyHeaderChoice(defel);
+                       opts_out->header_line = defGetCopyHeaderChoice(defel, is_from);
                }
                else if (strcmp(defel->defname, "quote") == 0)
                {
index e06534943f74118cdd75e970d75814dce9bd700f..57813b3458b5b112d2c0b5cd56c1668c01e9dc1c 100644 (file)
@@ -789,11 +789,12 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
                        foreach(cur, cstate->attnumlist)
                        {
                                int                     attnum = lfirst_int(cur);
-                               char       *colName = cstate->raw_fields[attnum - 1];
+                               char       *colName;
                                Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
-                               fldnum++;
+                               Assert(fldnum < cstate->max_fields);
 
+                               colName = cstate->raw_fields[fldnum++];
                                if (colName == NULL)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
index e8d6b4fc13b0214f252bd89836f696ab5e829d77..7f2f4ae4aea6179b216a0146f5f033fb69ac727b 100644 (file)
@@ -182,9 +182,21 @@ create table header_copytest (
        b int,
        c text
 );
+-- Make sure it works with with dropped columns
+alter table header_copytest drop column c;
+alter table header_copytest add column c text;
+copy header_copytest to stdout with (header match);
+ERROR:  cannot use "match" with HEADER in COPY TO
 copy header_copytest from stdin with (header wrong_choice);
 ERROR:  header requires a Boolean value or "match"
+-- works
 copy header_copytest from stdin with (header match);
+copy header_copytest (c, a, b) from stdin with (header match);
+copy header_copytest from stdin with (header match, format csv);
+-- errors
+copy header_copytest (c, b, a) from stdin with (header match);
+ERROR:  column name mismatch in header line field 1: got "a", expected "c"
+CONTEXT:  COPY header_copytest, line 1: "a     b       c"
 copy header_copytest from stdin with (header match);
 ERROR:  column name mismatch in header line field 3: got null value ("\N"), expected "c"
 CONTEXT:  COPY header_copytest, line 1: "a     b       \N"
@@ -197,5 +209,34 @@ CONTEXT:  COPY header_copytest, line 1: "a b       c       d"
 copy header_copytest from stdin with (header match);
 ERROR:  column name mismatch in header line field 3: got "d", expected "c"
 CONTEXT:  COPY header_copytest, line 1: "a     b       d"
-copy header_copytest from stdin with (header match, format csv);
+SELECT * FROM header_copytest ORDER BY a;
+ a | b |  c  
+---+---+-----
+ 1 | 2 | foo
+ 3 | 4 | bar
+ 5 | 6 | baz
+(3 rows)
+
+-- Drop an extra column, in the middle of the existing set.
+alter table header_copytest drop column b;
+-- works
+copy header_copytest (c, a) from stdin with (header match);
+copy header_copytest (a, c) from stdin with (header match);
+-- errors
+copy header_copytest from stdin with (header match);
+ERROR:  wrong number of fields in header line: field count is 3, expected 2
+CONTEXT:  COPY header_copytest, line 1: "a     ........pg.dropped.2........    c"
+copy header_copytest (a, c) from stdin with (header match);
+ERROR:  wrong number of fields in header line: field count is 3, expected 2
+CONTEXT:  COPY header_copytest, line 1: "a     c       b"
+SELECT * FROM header_copytest ORDER BY a;
+ a |  c  
+---+-----
+ 1 | foo
+ 3 | bar
+ 5 | baz
+ 7 | foo
+ 8 | foo
+(5 rows)
+
 drop table header_copytest;
index d72d226f341f42c69ffcb773c4faf53d9e586894..285022e07c6232f4d0fcbabe85b66393bd75a1f9 100644 (file)
@@ -204,11 +204,29 @@ create table header_copytest (
        b int,
        c text
 );
+-- Make sure it works with with dropped columns
+alter table header_copytest drop column c;
+alter table header_copytest add column c text;
+copy header_copytest to stdout with (header match);
 copy header_copytest from stdin with (header wrong_choice);
+-- works
 copy header_copytest from stdin with (header match);
 a      b       c
 1      2       foo
 \.
+copy header_copytest (c, a, b) from stdin with (header match);
+c      a       b
+bar    3       4
+\.
+copy header_copytest from stdin with (header match, format csv);
+a,b,c
+5,6,baz
+\.
+-- errors
+copy header_copytest (c, b, a) from stdin with (header match);
+a      b       c
+1      2       foo
+\.
 copy header_copytest from stdin with (header match);
 a      b       \N
 1      2       foo
@@ -225,8 +243,28 @@ copy header_copytest from stdin with (header match);
 a      b       d
 1      2       foo
 \.
-copy header_copytest from stdin with (header match, format csv);
-a,b,c
-1,2,foo
+SELECT * FROM header_copytest ORDER BY a;
+
+-- Drop an extra column, in the middle of the existing set.
+alter table header_copytest drop column b;
+-- works
+copy header_copytest (c, a) from stdin with (header match);
+c      a
+foo    7
+\.
+copy header_copytest (a, c) from stdin with (header match);
+a      c
+8      foo
 \.
+-- errors
+copy header_copytest from stdin with (header match);
+a      ........pg.dropped.2........    c
+1      2       foo
+\.
+copy header_copytest (a, c) from stdin with (header match);
+a      c       b
+1      foo     2
+\.
+
+SELECT * FROM header_copytest ORDER BY a;
 drop table header_copytest;