file_fdw: Add on_error and log_verbosity options to file_fdw.
authorFujii Masao <fujii@postgresql.org>
Thu, 3 Oct 2024 06:57:32 +0000 (15:57 +0900)
committerFujii Masao <fujii@postgresql.org>
Thu, 3 Oct 2024 06:57:32 +0000 (15:57 +0900)
In v17, the on_error and log_verbosity options were introduced for
the COPY command. This commit extends support for these options
to file_fdw.

Setting on_error = 'ignore' for a file_fdw foreign table allows users
to query it without errors, even when the input file contains
malformed rows, by skipping the problematic rows.

Both on_error and log_verbosity options apply to SELECT and ANALYZE
operations on file_fdw foreign tables.

Author: Atsushi Torikoshi
Reviewed-by: Masahiko Sawada, Fujii Masao
Discussion: https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com

contrib/file_fdw/expected/file_fdw.out
contrib/file_fdw/file_fdw.c
contrib/file_fdw/sql/file_fdw.sql
doc/src/sgml/file-fdw.sgml

index 86c148a86ba3afa8d916c6323c0ba99f10f6c17c..593fdc782e37d6ecc739f505e65a77474eb3cb88 100644 (file)
@@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 SELECT * FROM agg_bad;               -- ERROR
 ERROR:  invalid input syntax for type real: "aaa"
 CONTEXT:  COPY agg_bad, line 3, column b: "aaa"
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+NOTICE:  1 row was skipped due to data type incompatibility
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ANALYZE agg_bad;
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
index d16821f8e1b46081ceb523e7629a79c7458a0467..043204c3e7ec447151be1923adadcda6025af341 100644 (file)
@@ -22,6 +22,7 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_table.h"
 #include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
        {"null", ForeignTableRelationId},
        {"default", ForeignTableRelationId},
        {"encoding", ForeignTableRelationId},
+       {"on_error", ForeignTableRelationId},
+       {"log_verbosity", ForeignTableRelationId},
        {"force_not_null", AttributeRelationId},
        {"force_null", AttributeRelationId},
 
@@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
        FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
        EState     *estate = CreateExecutorState();
        ExprContext *econtext;
-       MemoryContext oldcontext;
+       MemoryContext oldcontext = CurrentMemoryContext;
        TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
-       bool            found;
+       CopyFromState cstate = festate->cstate;
        ErrorContextCallback errcallback;
 
        /* Set up callback to identify error line number. */
        errcallback.callback = CopyFromErrorCallback;
-       errcallback.arg = (void *) festate->cstate;
+       errcallback.arg = (void *) cstate;
        errcallback.previous = error_context_stack;
        error_context_stack = &errcallback;
 
        /*
-        * The protocol for loading a virtual tuple into a slot is first
-        * ExecClearTuple, then fill the values/isnull arrays, then
-        * ExecStoreVirtualTuple.  If we don't find another row in the file, we
-        * just skip the last step, leaving the slot empty as required.
-        *
         * We pass ExprContext because there might be a use of the DEFAULT option
         * in COPY FROM, so we may need to evaluate default expressions.
         */
-       ExecClearTuple(slot);
        econtext = GetPerTupleExprContext(estate);
 
+retry:
+
        /*
         * DEFAULT expressions need to be evaluated in a per-tuple context, so
         * switch in case we are doing that.
         */
-       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-       found = NextCopyFrom(festate->cstate, econtext,
-                                                slot->tts_values, slot->tts_isnull);
-       if (found)
+       MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+       /*
+        * The protocol for loading a virtual tuple into a slot is first
+        * ExecClearTuple, then fill the values/isnull arrays, then
+        * ExecStoreVirtualTuple.  If we don't find another row in the file, we
+        * just skip the last step, leaving the slot empty as required.
+        *
+        */
+       ExecClearTuple(slot);
+
+       if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull))
+       {
+               if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+                       cstate->escontext->error_occurred)
+               {
+                       /*
+                        * Soft error occurred, skip this tuple and just make
+                        * ErrorSaveContext ready for the next NextCopyFrom. Since we
+                        * don't set details_wanted and error_data is not to be filled,
+                        * just resetting error_occurred is enough.
+                        */
+                       cstate->escontext->error_occurred = false;
+
+                       /* Switch back to original memory context */
+                       MemoryContextSwitchTo(oldcontext);
+
+                       /*
+                        * Make sure we are interruptible while repeatedly calling
+                        * NextCopyFrom() until no soft error occurs.
+                        */
+                       CHECK_FOR_INTERRUPTS();
+
+                       /*
+                        * Reset the per-tuple exprcontext, to clean-up after expression
+                        * evaluations etc.
+                        */
+                       ResetPerTupleExprContext(estate);
+
+                       /* Repeat NextCopyFrom() until no soft error occurs */
+                       goto retry;
+               }
+
                ExecStoreVirtualTuple(slot);
+       }
 
        /* Switch back to original memory context */
        MemoryContextSwitchTo(oldcontext);
@@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
        FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
 
        /* if festate is NULL, we are in EXPLAIN; nothing to do */
-       if (festate)
-               EndCopyFrom(festate->cstate);
+       if (!festate)
+               return;
+
+       if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+               festate->cstate->num_errors > 0 &&
+               festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+               ereport(NOTICE,
+                               errmsg_plural("%llu row was skipped due to data type incompatibility",
+                                                         "%llu rows were skipped due to data type incompatibility",
+                                                         (unsigned long long) festate->cstate->num_errors,
+                                                         (unsigned long long) festate->cstate->num_errors));
+
+       EndCopyFrom(festate->cstate);
 }
 
 /*
@@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
  * which must have at least targrows entries.
  * The actual number of rows selected is returned as the function result.
  * We also count the total number of rows in the file and return it into
- * *totalrows.  Note that *totaldeadrows is always set to 0.
+ * *totalrows.  Rows skipped due to on_error = 'ignore' are not included
+ * in this count.  Note that *totaldeadrows is always set to 0.
  *
  * Note that the returned list of rows is not always in order by physical
  * position in the file.  Therefore, correlation estimates derived later
@@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
                if (!found)
                        break;
 
+               if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+                       cstate->escontext->error_occurred)
+               {
+                       /*
+                        * Soft error occurred, skip this tuple and just make
+                        * ErrorSaveContext ready for the next NextCopyFrom. Since we
+                        * don't set details_wanted and error_data is not to be filled,
+                        * just resetting error_occurred is enough.
+                        */
+                       cstate->escontext->error_occurred = false;
+
+                       /* Repeat NextCopyFrom() until no soft error occurs */
+                       continue;
+               }
+
                /*
                 * The first targrows sample rows are simply copied into the
                 * reservoir.  Then we start replacing tuples in the sample until we
@@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
        /* Clean up. */
        MemoryContextDelete(tupcontext);
 
+       if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
+               cstate->num_errors > 0 &&
+               cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
+               ereport(NOTICE,
+                               errmsg_plural("%llu row was skipped due to data type incompatibility",
+                                                         "%llu rows were skipped due to data type incompatibility",
+                                                         (unsigned long long) cstate->num_errors,
+                                                         (unsigned long long) cstate->num_errors));
+
        EndCopyFrom(cstate);
 
        pfree(values);
index f0548e14e18452789cd83b4fa8f6d88a6b9284a9..edd77c5cd208e7be04052509b08ab46f3f25c287 100644 (file)
@@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 -- error context report tests
 SELECT * FROM agg_bad;               -- ERROR
 
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
+SELECT * FROM agg_bad;
+ANALYZE agg_bad;
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
index f2f2af9a5962fda8f35da47f3f017339f322a541..bb3579b0777781c509f1eba5b20dbff0ed047ef6 100644 (file)
    </listitem>
   </varlistentry>
 
+  <varlistentry>
+   <term><literal>on_error</literal></term>
+
+   <listitem>
+    <para>
+     Specifies how to behave when encountering an error converting a column's
+     input value into its data type,
+     the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
+  <varlistentry>
+   <term><literal>log_verbosity</literal></term>
+
+   <listitem>
+    <para>
+     Specifies the amount of messages emitted by <literal>file_fdw</literal>,
+     the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
+    </para>
+   </listitem>
+  </varlistentry>
+
  </variablelist>
 
  <para>