Refactor COPY FROM to use format callback functions.
authorMasahiko Sawada <msawada@postgresql.org>
Fri, 28 Feb 2025 18:29:36 +0000 (10:29 -0800)
committerMasahiko Sawada <msawada@postgresql.org>
Fri, 28 Feb 2025 18:29:36 +0000 (10:29 -0800)
This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.

Similar to 2e4127b6d2d, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com

src/backend/commands/copyfrom.c
src/backend/commands/copyfromparse.c
src/include/commands/copy.h
src/include/commands/copyapi.h
src/include/commands/copyfrom_internal.h
src/tools/pgindent/typedefs.list

index 8875d79d59ad78bb4b221e86bad9aeaa7dfbefb6..198cee2bc48b980742eb328b4ea6123a7bbd8959 100644 (file)
@@ -28,7 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
@@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+/*
+ * Built-in format-specific routines. One-row callbacks are defined in
+ * copyfromparse.c.
+ */
+static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                                  Oid *typioparam);
+static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromTextLikeEnd(CopyFromState cstate);
+static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                                FmgrInfo *finfo, Oid *typioparam);
+static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromBinaryEnd(CopyFromState cstate);
+
+
+/*
+ * COPY FROM routines for built-in formats.
+ *
+ * CSV and text formats share the same TextLike routines except for the
+ * one-row callback.
+ */
+
+/* text format */
+static const CopyFromRoutine CopyFromRoutineText = {
+   .CopyFromInFunc = CopyFromTextLikeInFunc,
+   .CopyFromStart = CopyFromTextLikeStart,
+   .CopyFromOneRow = CopyFromTextOneRow,
+   .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* CSV format */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+   .CopyFromInFunc = CopyFromTextLikeInFunc,
+   .CopyFromStart = CopyFromTextLikeStart,
+   .CopyFromOneRow = CopyFromCSVOneRow,
+   .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* binary format */
+static const CopyFromRoutine CopyFromRoutineBinary = {
+   .CopyFromInFunc = CopyFromBinaryInFunc,
+   .CopyFromStart = CopyFromBinaryStart,
+   .CopyFromOneRow = CopyFromBinaryOneRow,
+   .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/* Return a COPY FROM routine for the given options */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+   if (opts.csv_mode)
+       return &CopyFromRoutineCSV;
+   else if (opts.binary)
+       return &CopyFromRoutineBinary;
+
+   /* default is text */
+   return &CopyFromRoutineText;
+}
+
+/* Implementation of the start callback for text and CSV formats */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+   AttrNumber  attr_count;
+
+   /*
+    * If encoding conversion is needed, we need another buffer to hold the
+    * converted input data.  Otherwise, we can just point input_buf to the
+    * same buffer as raw_buf.
+    */
+   if (cstate->need_transcoding)
+   {
+       cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+       cstate->input_buf_index = cstate->input_buf_len = 0;
+   }
+   else
+       cstate->input_buf = cstate->raw_buf;
+   cstate->input_reached_eof = false;
+
+   initStringInfo(&cstate->line_buf);
+
+   /*
+    * Create workspace for CopyReadAttributes results; used by CSV and text
+    * format.
+    */
+   attr_count = list_length(cstate->attnumlist);
+   cstate->max_fields = attr_count;
+   cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * Implementation of the infunc callback for text and CSV formats. Assign
+ * the input function data to the given *finfo.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                      Oid *typioparam)
+{
+   Oid         func_oid;
+
+   getTypeInputInfo(atttypid, &func_oid, typioparam);
+   fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for text and CSV formats */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+   /* nothing to do */
+}
+
+/* Implementation of the start callback for binary format */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+   /* Read and verify binary header */
+   ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * Implementation of the infunc callback for binary format. Assign
+ * the binary input function to the given *finfo.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                    FmgrInfo *finfo, Oid *typioparam)
+{
+   Oid         func_oid;
+
+   getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+   fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for binary format */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+   /* nothing to do */
+}
+
 /*
  * error context callback for COPY FROM
  *
@@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate,
                num_defaults;
    FmgrInfo   *in_functions;
    Oid        *typioparams;
-   Oid         in_func_oid;
    int        *defmap;
    ExprState **defexprs;
    MemoryContext oldcontext;
@@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate,
    /* Extract options from the statement node tree */
    ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+   /* Set the format routine */
+   cstate->routine = CopyFromGetRoutine(cstate->opts);
+
    /* Process the target relation */
    cstate->rel = rel;
 
@@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate,
    cstate->raw_buf_index = cstate->raw_buf_len = 0;
    cstate->raw_reached_eof = false;
 
-   if (!cstate->opts.binary)
-   {
-       /*
-        * If encoding conversion is needed, we need another buffer to hold
-        * the converted input data.  Otherwise, we can just point input_buf
-        * to the same buffer as raw_buf.
-        */
-       if (cstate->need_transcoding)
-       {
-           cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-           cstate->input_buf_index = cstate->input_buf_len = 0;
-       }
-       else
-           cstate->input_buf = cstate->raw_buf;
-       cstate->input_reached_eof = false;
-
-       initStringInfo(&cstate->line_buf);
-   }
-
    initStringInfo(&cstate->attribute_buf);
 
    /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate,
            continue;
 
        /* Fetch the input function and typioparam info */
-       if (cstate->opts.binary)
-           getTypeBinaryInputInfo(att->atttypid,
-                                  &in_func_oid, &typioparams[attnum - 1]);
-       else
-           getTypeInputInfo(att->atttypid,
-                            &in_func_oid, &typioparams[attnum - 1]);
-       fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+       cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                       &in_functions[attnum - 1],
+                                       &typioparams[attnum - 1]);
 
        /* Get default info if available */
        defexprs[attnum - 1] = NULL;
@@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate,
 
    pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-   if (cstate->opts.binary)
-   {
-       /* Read and verify binary header */
-       ReceiveCopyBinaryHeader(cstate);
-   }
-
-   /* create workspace for CopyReadAttributes results */
-   if (!cstate->opts.binary)
-   {
-       AttrNumber  attr_count = list_length(cstate->attnumlist);
-
-       cstate->max_fields = attr_count;
-       cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-   }
+   cstate->routine->CopyFromStart(cstate, tupDesc);
 
    MemoryContextSwitchTo(oldcontext);
 
@@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+   /* Invoke the end callback */
+   cstate->routine->CopyFromEnd(cstate);
+
    /* No COPY FROM related resources except memory. */
    if (cstate->is_program)
    {
index caccdc8563c042dec92401ccdec70ae1759dca3d..bad577aa67b099735d0d2eecbf62bf8032509f23 100644 (file)
@@ -62,7 +62,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
@@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int CopyReadAttributesText(CopyFromState cstate);
 static int CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
                                     Oid typioparam, int32 typmod,
                                     bool *isnull);
+static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate,
+                                                             ExprContext *econtext,
+                                                             Datum *values,
+                                                             bool *nulls,
+                                                             bool is_csv);
 
 
 /* Low-level communications functions */
@@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  * in the relation.
  *
  * NOTE: force_not_null option are not applied to the returned fields.
+ *
+ * We use pg_attribute_always_inline to reduce function call overhead
+ * and to help compilers to optimize away the 'is_csv' condition.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
    int         fldct;
    bool        done;
@@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
        tupDesc = RelationGetDescr(cstate->rel);
 
        cstate->cur_lineno++;
-       done = CopyReadLine(cstate);
+       done = CopyReadLine(cstate, is_csv);
 
        if (cstate->opts.header_line == COPY_HEADER_MATCH)
        {
            int         fldnum;
 
-           if (cstate->opts.csv_mode)
+           if (is_csv)
                fldct = CopyReadAttributesCSV(cstate);
            else
                fldct = CopyReadAttributesText(cstate);
@@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
    cstate->cur_lineno++;
 
    /* Actually read the line into memory here */
-   done = CopyReadLine(cstate);
+   done = CopyReadLine(cstate, is_csv);
 
    /*
     * EOF at start of line means we're done.  If we see EOF after some
@@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
        return false;
 
    /* Parse the line into de-escaped field values */
-   if (cstate->opts.csv_mode)
+   if (is_csv)
        fldct = CopyReadAttributesCSV(cstate);
    else
        fldct = CopyReadAttributesText(cstate);
@@ -847,233 +855,275 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
    TupleDesc   tupDesc;
    AttrNumber  num_phys_attrs,
-               attr_count,
                num_defaults = cstate->num_defaults;
-   FmgrInfo   *in_functions = cstate->in_functions;
-   Oid        *typioparams = cstate->typioparams;
    int         i;
    int        *defmap = cstate->defmap;
    ExprState **defexprs = cstate->defexprs;
 
    tupDesc = RelationGetDescr(cstate->rel);
    num_phys_attrs = tupDesc->natts;
-   attr_count = list_length(cstate->attnumlist);
 
    /* Initialize all values for row to NULL */
    MemSet(values, 0, num_phys_attrs * sizeof(Datum));
    MemSet(nulls, true, num_phys_attrs * sizeof(bool));
    MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-   if (!cstate->opts.binary)
+   /* Get one row from source */
+   if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+       return false;
+
+   /*
+    * Now compute and insert any defaults available for the columns not
+    * provided by the input data.  Anything not processed here or above will
+    * remain NULL.
+    */
+   for (i = 0; i < num_defaults; i++)
    {
-       char      **field_strings;
-       ListCell   *cur;
-       int         fldct;
-       int         fieldno;
-       char       *string;
+       /*
+        * The caller must supply econtext and have switched into the
+        * per-tuple memory context in it.
+        */
+       Assert(econtext != NULL);
+       Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-       /* read raw fields in the next line */
-       if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-           return false;
+       values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
+                                        &nulls[defmap[i]]);
+   }
+
+   return true;
+}
+
+/* Implementation of the per-row callback for text format */
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                  bool *nulls)
+{
+   return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/* Implementation of the per-row callback for CSV format */
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                 bool *nulls)
+{
+   return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
 
-       /* check for overflowing fields */
-       if (attr_count > 0 && fldct > attr_count)
+/*
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ *
+ * We use pg_attribute_always_inline to reduce function call overhead
+ * and to help compilers to optimize away the 'is_csv' condition.
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
+                      Datum *values, bool *nulls, bool is_csv)
+{
+   TupleDesc   tupDesc;
+   AttrNumber  attr_count;
+   FmgrInfo   *in_functions = cstate->in_functions;
+   Oid        *typioparams = cstate->typioparams;
+   ExprState **defexprs = cstate->defexprs;
+   char      **field_strings;
+   ListCell   *cur;
+   int         fldct;
+   int         fieldno;
+   char       *string;
+
+   tupDesc = RelationGetDescr(cstate->rel);
+   attr_count = list_length(cstate->attnumlist);
+
+   /* read raw fields in the next line */
+   if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+       return false;
+
+   /* check for overflowing fields */
+   if (attr_count > 0 && fldct > attr_count)
+       ereport(ERROR,
+               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                errmsg("extra data after last expected column")));
+
+   fieldno = 0;
+
+   /* Loop to read the user attributes on the line. */
+   foreach(cur, cstate->attnumlist)
+   {
+       int         attnum = lfirst_int(cur);
+       int         m = attnum - 1;
+       Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+       if (fieldno >= fldct)
            ereport(ERROR,
                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                    errmsg("extra data after last expected column")));
-
-       fieldno = 0;
+                    errmsg("missing data for column \"%s\"",
+                           NameStr(att->attname))));
+       string = field_strings[fieldno++];
 
-       /* Loop to read the user attributes on the line. */
-       foreach(cur, cstate->attnumlist)
+       if (cstate->convert_select_flags &&
+           !cstate->convert_select_flags[m])
        {
-           int         attnum = lfirst_int(cur);
-           int         m = attnum - 1;
-           Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-           if (fieldno >= fldct)
-               ereport(ERROR,
-                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        errmsg("missing data for column \"%s\"",
-                               NameStr(att->attname))));
-           string = field_strings[fieldno++];
+           /* ignore input field, leaving column as NULL */
+           continue;
+       }
 
-           if (cstate->convert_select_flags &&
-               !cstate->convert_select_flags[m])
+       if (is_csv)
+       {
+           if (string == NULL &&
+               cstate->opts.force_notnull_flags[m])
            {
-               /* ignore input field, leaving column as NULL */
-               continue;
+               /*
+                * FORCE_NOT_NULL option is set and column is NULL - convert
+                * it to the NULL string.
+                */
+               string = cstate->opts.null_print;
            }
-
-           if (cstate->opts.csv_mode)
+           else if (string != NULL && cstate->opts.force_null_flags[m]
+                    && strcmp(string, cstate->opts.null_print) == 0)
            {
-               if (string == NULL &&
-                   cstate->opts.force_notnull_flags[m])
-               {
-                   /*
-                    * FORCE_NOT_NULL option is set and column is NULL -
-                    * convert it to the NULL string.
-                    */
-                   string = cstate->opts.null_print;
-               }
-               else if (string != NULL && cstate->opts.force_null_flags[m]
-                        && strcmp(string, cstate->opts.null_print) == 0)
-               {
-                   /*
-                    * FORCE_NULL option is set and column matches the NULL
-                    * string. It must have been quoted, or otherwise the
-                    * string would already have been set to NULL. Convert it
-                    * to NULL as specified.
-                    */
-                   string = NULL;
-               }
+               /*
+                * FORCE_NULL option is set and column matches the NULL
+                * string. It must have been quoted, or otherwise the string
+                * would already have been set to NULL. Convert it to NULL as
+                * specified.
+                */
+               string = NULL;
            }
+       }
 
-           cstate->cur_attname = NameStr(att->attname);
-           cstate->cur_attval = string;
+       cstate->cur_attname = NameStr(att->attname);
+       cstate->cur_attval = string;
 
-           if (string != NULL)
-               nulls[m] = false;
+       if (string != NULL)
+           nulls[m] = false;
 
-           if (cstate->defaults[m])
-           {
-               /*
-                * The caller must supply econtext and have switched into the
-                * per-tuple memory context in it.
-                */
-               Assert(econtext != NULL);
-               Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+       if (cstate->defaults[m])
+       {
+           /* We must have switched into the per-tuple memory context */
+           Assert(econtext != NULL);
+           Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-               values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-           }
+           values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+       }
 
-           /*
-            * If ON_ERROR is specified with IGNORE, skip rows with soft
-            * errors
-            */
-           else if (!InputFunctionCallSafe(&in_functions[m],
-                                           string,
-                                           typioparams[m],
-                                           att->atttypmod,
-                                           (Node *) cstate->escontext,
-                                           &values[m]))
-           {
-               Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+       /*
+        * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+        */
+       else if (!InputFunctionCallSafe(&in_functions[m],
+                                       string,
+                                       typioparams[m],
+                                       att->atttypmod,
+                                       (Node *) cstate->escontext,
+                                       &values[m]))
+       {
+           Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
 
-               cstate->num_errors++;
+           cstate->num_errors++;
 
-               if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-               {
-                   /*
-                    * Since we emit line number and column info in the below
-                    * notice message, we suppress error context information
-                    * other than the relation name.
-                    */
-                   Assert(!cstate->relname_only);
-                   cstate->relname_only = true;
+           if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+           {
+               /*
+                * Since we emit line number and column info in the below
+                * notice message, we suppress error context information other
+                * than the relation name.
+                */
+               Assert(!cstate->relname_only);
+               cstate->relname_only = true;
 
-                   if (cstate->cur_attval)
-                   {
-                       char       *attval;
-
-                       attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                       ereport(NOTICE,
-                               errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
-                                      (unsigned long long) cstate->cur_lineno,
-                                      cstate->cur_attname,
-                                      attval));
-                       pfree(attval);
-                   }
-                   else
-                       ereport(NOTICE,
-                               errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
-                                      (unsigned long long) cstate->cur_lineno,
-                                      cstate->cur_attname));
-
-                   /* reset relname_only */
-                   cstate->relname_only = false;
+               if (cstate->cur_attval)
+               {
+                   char       *attval;
+
+                   attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                   ereport(NOTICE,
+                           errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
+                                  (unsigned long long) cstate->cur_lineno,
+                                  cstate->cur_attname,
+                                  attval));
+                   pfree(attval);
                }
+               else
+                   ereport(NOTICE,
+                           errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
+                                  (unsigned long long) cstate->cur_lineno,
+                                  cstate->cur_attname));
 
-               return true;
+               /* reset relname_only */
+               cstate->relname_only = false;
            }
 
-           cstate->cur_attname = NULL;
-           cstate->cur_attval = NULL;
+           return true;
        }
 
-       Assert(fieldno == attr_count);
+       cstate->cur_attname = NULL;
+       cstate->cur_attval = NULL;
    }
-   else
-   {
-       /* binary */
-       int16       fld_count;
-       ListCell   *cur;
 
-       cstate->cur_lineno++;
+   Assert(fieldno == attr_count);
 
-       if (!CopyGetInt16(cstate, &fld_count))
-       {
-           /* EOF detected (end of file, or protocol-level EOF) */
-           return false;
-       }
+   return true;
+}
 
-       if (fld_count == -1)
-       {
-           /*
-            * Received EOF marker.  Wait for the protocol-level EOF, and
-            * complain if it doesn't come immediately.  In COPY FROM STDIN,
-            * this ensures that we correctly handle CopyFail, if client
-            * chooses to send that now.  When copying from file, we could
-            * ignore the rest of the file like in text mode, but we choose to
-            * be consistent with the COPY FROM STDIN case.
-            */
-           char        dummy;
+/* Implementation of the per-row callback for binary format */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                    bool *nulls)
+{
+   TupleDesc   tupDesc;
+   AttrNumber  attr_count;
+   FmgrInfo   *in_functions = cstate->in_functions;
+   Oid        *typioparams = cstate->typioparams;
+   int16       fld_count;
+   ListCell   *cur;
 
-           if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-               ereport(ERROR,
-                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        errmsg("received copy data after EOF marker")));
-           return false;
-       }
+   tupDesc = RelationGetDescr(cstate->rel);
+   attr_count = list_length(cstate->attnumlist);
 
-       if (fld_count != attr_count)
-           ereport(ERROR,
-                   (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                    errmsg("row field count is %d, expected %d",
-                           (int) fld_count, attr_count)));
+   cstate->cur_lineno++;
 
-       foreach(cur, cstate->attnumlist)
-       {
-           int         attnum = lfirst_int(cur);
-           int         m = attnum - 1;
-           Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-           cstate->cur_attname = NameStr(att->attname);
-           values[m] = CopyReadBinaryAttribute(cstate,
-                                               &in_functions[m],
-                                               typioparams[m],
-                                               att->atttypmod,
-                                               &nulls[m]);
-           cstate->cur_attname = NULL;
-       }
+   if (!CopyGetInt16(cstate, &fld_count))
+   {
+       /* EOF detected (end of file, or protocol-level EOF) */
+       return false;
    }
 
-   /*
-    * Now compute and insert any defaults available for the columns not
-    * provided by the input data.  Anything not processed here or above will
-    * remain NULL.
-    */
-   for (i = 0; i < num_defaults; i++)
+   if (fld_count == -1)
    {
        /*
-        * The caller must supply econtext and have switched into the
-        * per-tuple memory context in it.
+        * Received EOF marker.  Wait for the protocol-level EOF, and complain
+        * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+        * that we correctly handle CopyFail, if client chooses to send that
+        * now.  When copying from file, we could ignore the rest of the file
+        * like in text mode, but we choose to be consistent with the COPY
+        * FROM STDIN case.
         */
-       Assert(econtext != NULL);
-       Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+       char        dummy;
 
-       values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
-                                        &nulls[defmap[i]]);
+       if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+           ereport(ERROR,
+                   (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                    errmsg("received copy data after EOF marker")));
+       return false;
+   }
+
+   if (fld_count != attr_count)
+       ereport(ERROR,
+               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                errmsg("row field count is %d, expected %d",
+                       (int) fld_count, attr_count)));
+
+   foreach(cur, cstate->attnumlist)
+   {
+       int         attnum = lfirst_int(cur);
+       int         m = attnum - 1;
+       Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+       cstate->cur_attname = NameStr(att->attname);
+       values[m] = CopyReadBinaryAttribute(cstate,
+                                           &in_functions[m],
+                                           typioparams[m],
+                                           att->atttypmod,
+                                           &nulls[m]);
+       cstate->cur_attname = NULL;
    }
 
    return true;
@@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
    bool        result;
 
@@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate)
    cstate->line_buf_valid = false;
 
    /* Parse data and transfer into line_buf */
-   result = CopyReadLineText(cstate);
+   result = CopyReadLineText(cstate, is_csv);
 
    if (result)
    {
@@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate)
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
 static bool
-CopyReadLineText(CopyFromState cstate)
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
    char       *copy_input_buf;
    int         input_buf_ptr;
@@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate)
    char        quotec = '\0';
    char        escapec = '\0';
 
-   if (cstate->opts.csv_mode)
+   if (is_csv)
    {
        quotec = cstate->opts.quote[0];
        escapec = cstate->opts.escape[0];
@@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate)
        prev_raw_ptr = input_buf_ptr;
        c = copy_input_buf[input_buf_ptr++];
 
-       if (cstate->opts.csv_mode)
+       if (is_csv)
        {
            /*
             * If character is '\r', we may need to look ahead below.  Force
@@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate)
        }
 
        /* Process \r */
-       if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+       if (c == '\r' && (!is_csv || !in_quote))
        {
            /* Check for \r\n on first line, _and_ handle \r\n. */
            if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate)
                    if (cstate->eol_type == EOL_CRNL)
                        ereport(ERROR,
                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                !cstate->opts.csv_mode ?
+                                !is_csv ?
                                 errmsg("literal carriage return found in data") :
                                 errmsg("unquoted carriage return found in data"),
-                                !cstate->opts.csv_mode ?
+                                !is_csv ?
                                 errhint("Use \"\\r\" to represent carriage return.") :
                                 errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate)
            else if (cstate->eol_type == EOL_NL)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        !cstate->opts.csv_mode ?
+                        !is_csv ?
                         errmsg("literal carriage return found in data") :
                         errmsg("unquoted carriage return found in data"),
-                        !cstate->opts.csv_mode ?
+                        !is_csv ?
                         errhint("Use \"\\r\" to represent carriage return.") :
                         errhint("Use quoted CSV field to represent carriage return.")));
            /* If reach here, we have found the line terminator */
@@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate)
        }
 
        /* Process \n */
-       if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+       if (c == '\n' && (!is_csv || !in_quote))
        {
            if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                ereport(ERROR,
                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                        !cstate->opts.csv_mode ?
+                        !is_csv ?
                         errmsg("literal newline found in data") :
                         errmsg("unquoted newline found in data"),
-                        !cstate->opts.csv_mode ?
+                        !is_csv ?
                         errhint("Use \"\\n\" to represent newline.") :
                         errhint("Use quoted CSV field to represent newline.")));
            cstate->eol_type = EOL_NL;  /* in case not set yet */
@@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate)
         * Process backslash, except in CSV mode where backslash is a normal
         * character.
         */
-       if (c == '\\' && !cstate->opts.csv_mode)
+       if (c == '\\' && !is_csv)
        {
            char        c2;
 
index 06dfdfef7210c7ff3ef0f6e8a4d56710ded2f1ae..7bc044e2816e438e8c7bc8df5e5192670bdf0203 100644 (file)
@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                         Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                 char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
index bd2d386816e67bc91cf04ddf7112b7252fe6ba0b..2a2d2f9876bafc24f3e33f30f37611a05f1753a8 100644 (file)
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copyapi.h
- *   API for COPY TO handlers
+ *   API for COPY TO/FROM handlers
  *
  *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
@@ -54,4 +54,52 @@ typedef struct CopyToRoutine
    void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * API structure for a COPY FROM format implementation. Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+   /*
+    * Set input function information. This callback is called once at the
+    * beginning of COPY FROM.
+    *
+    * 'finfo' can be optionally filled to provide the catalog information of
+    * the input function.
+    *
+    * 'typioparam' can be optionally filled to define the OID of the type to
+    * pass to the input function.'atttypid' is the OID of data type used by
+    * the relation's attribute.
+    */
+   void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                  FmgrInfo *finfo, Oid *typioparam);
+
+   /*
+    * Start a COPY FROM. This callback is called once at the beginning of
+    * COPY FROM.
+    *
+    * 'tupDesc' is the tuple descriptor of the relation where the data needs
+    * to be copied. This can be used for any initialization steps required by
+    * a format.
+    */
+   void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+   /*
+    * Read one row from the source and fill *values and *nulls.
+    *
+    * 'econtext' is used to evaluate default expression for each column that
+    * is either not read from the file or is using the DEFAULT option of COPY
+    * FROM. It is NULL if no default values are used.
+    *
+    * Returns false if there are no more tuples to read.
+    */
+   bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                  Datum *values, bool *nulls);
+
+   /*
+    * End a COPY FROM. This callback is called once at the end of COPY FROM.
+    */
+   void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
 #endif                         /* COPYAPI_H */
index 1d8ac8f62e638d4566232c9718860670380fbee6..c8b22af22d8521c1c9aa923f047741203a757554 100644 (file)
@@ -58,6 +58,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+   /* format routine */
+   const struct CopyFromRoutine *routine;
+
    /* low-level state data */
    CopySource  copy_src;       /* type of copy source */
    FILE       *copy_file;      /* used if copy_src == COPY_FILE */
@@ -183,4 +186,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* One-row callbacks for built-in formats defined in copyfromparse.c */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                              Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                             Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                Datum *values, bool *nulls);
+
 #endif                         /* COPYFROM_INTERNAL_H */
index fcb968e1ffe6d244260ffe37c83035fcafa31d68..56989aa0b8414aba4cb06a702a927a72f708175b 100644 (file)
@@ -501,6 +501,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice