Add support for COPY TO callback functions
authorMichael Paquier <michael@paquier.xyz>
Tue, 11 Oct 2022 02:45:52 +0000 (11:45 +0900)
committerMichael Paquier <michael@paquier.xyz>
Tue, 11 Oct 2022 02:45:52 +0000 (11:45 +0900)
This is useful as a way for extensions to process COPY TO rows in the
way they see fit (say auditing, analytics, backend, etc.) without the
need to invoke an external process running as the OS user running the
backend through PROGRAM that requires superuser rights.  COPY FROM
already provides a similar callback for logical replication.  For COPY
TO, the callback is triggered when we are ready to send a row in
CopySendEndOfRow(), which is the same code path as when sending a row
to a frontend or a pipe/file.

A small test module, test_copy_callbacks, is added to provide some
coverage for this facility.

Author: Bilva Sanaba, Nathan Bossart
Discussion: https://postgr.es/m/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com

14 files changed:
src/backend/commands/copy.c
src/backend/commands/copyto.c
src/include/commands/copy.h
src/test/modules/Makefile
src/test/modules/meson.build
src/test/modules/test_copy_callbacks/.gitignore [new file with mode: 0644]
src/test/modules/test_copy_callbacks/Makefile [new file with mode: 0644]
src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out [new file with mode: 0644]
src/test/modules/test_copy_callbacks/meson.build [new file with mode: 0644]
src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql [new file with mode: 0644]
src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql [new file with mode: 0644]
src/test/modules/test_copy_callbacks/test_copy_callbacks.c [new file with mode: 0644]
src/test/modules/test_copy_callbacks/test_copy_callbacks.control [new file with mode: 0644]
src/tools/pgindent/typedefs.list

index 49924e476afb17d7a5e3361a198f3a4915219fcb..db4c9dbc2313c861619d572bd88139405a79c29d 100644 (file)
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
                cstate = BeginCopyTo(pstate, rel, query, relid,
                                                         stmt->filename, stmt->is_program,
-                                                        stmt->attlist, stmt->options);
+                                                        NULL, stmt->attlist, stmt->options);
                *processed = DoCopyTo(cstate);  /* copy from database to file */
                EndCopyTo(cstate);
        }
index fca29a9a1050009fb2835bcb6f2637a139527da1..2527e6605988eb40bd909e083a0208ac1e65a9c8 100644 (file)
@@ -51,6 +51,7 @@ typedef enum CopyDest
 {
        COPY_FILE,                                      /* to file (or a piped program) */
        COPY_FRONTEND,                          /* to frontend */
+       COPY_CALLBACK                           /* to callback function */
 } CopyDest;
 
 /*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
        List       *attnumlist;         /* integer list of attnums to copy */
        char       *filename;           /* filename, or NULL for STDOUT */
        bool            is_program;             /* is 'filename' a program to popen? */
+       copy_data_dest_cb data_dest_cb; /* function for writing data */
 
        CopyFormatOptions opts;
        Node       *whereClause;        /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
                        break;
+               case COPY_CALLBACK:
+                       cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+                       break;
        }
 
        /* Update the progress */
@@ -336,6 +341,17 @@ EndCopy(CopyToState cstate)
 
 /*
  * Setup CopyToState to read tuples from a table or a query for COPY TO.
+ *
+ * 'rel': Relation to be copied
+ * 'raw_query': Query whose results are to be copied
+ * 'queryRelId': OID of base relation to convert to a query (for RLS)
+ * 'filename': Name of server-local file to write, NULL for STDOUT
+ * 'is_program': true if 'filename' is program to execute
+ * 'data_dest_cb': Callback that processes the output data
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
  */
 CopyToState
 BeginCopyTo(ParseState *pstate,
@@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate,
                        Oid queryRelId,
                        const char *filename,
                        bool is_program,
+                       copy_data_dest_cb data_dest_cb,
                        List *attnamelist,
                        List *options)
 {
        CopyToState cstate;
-       bool            pipe = (filename == NULL);
+       bool            pipe = (filename == NULL && data_dest_cb == NULL);
        TupleDesc       tupDesc;
        int                     num_phys_attrs;
        MemoryContext oldcontext;
@@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate,
 
        cstate->copy_dest = COPY_FILE;  /* default */
 
-       if (pipe)
+       if (data_dest_cb)
+       {
+               progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+               cstate->copy_dest = COPY_CALLBACK;
+               cstate->data_dest_cb = data_dest_cb;
+       }
+       else if (pipe)
        {
                progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 
@@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate)
 
 /*
  * Copy from relation or query TO file.
+ *
+ * Returns the number of rows processed.
  */
 uint64
 DoCopyTo(CopyToState cstate)
 {
-       bool            pipe = (cstate->filename == NULL);
+       bool            pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
        bool            fe_copy = (pipe && whereToSendOutput == DestRemote);
        TupleDesc       tupDesc;
        int                     num_phys_attrs;
index 3f6677b1327ced7fdf5e0758cd259fceba120b2e..b77b9350058d81f68eaeca02e0e995d53d42e4a4 100644 (file)
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *data, int len);
 
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                                   int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
  */
 extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
                                                           Oid queryRelId, const char *filename, bool is_program,
-                                                          List *attnamelist, List *options);
+                                                          copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
 extern void EndCopyTo(CopyToState cstate);
 extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
index 6c31c8707c24dbdfb813adec7043af31eb2aaa89..7b3f292965218fd0d42efa40366ff97b667f2ba2 100644 (file)
@@ -15,6 +15,7 @@ SUBDIRS = \
                  snapshot_too_old \
                  spgist_name_ops \
                  test_bloomfilter \
+                 test_copy_callbacks \
                  test_ddl_deparse \
                  test_extensions \
                  test_ginpostinglist \
index a80e6e2ce29b71721d83851a2bddaf98ad03c0f2..c2e5f5ffd5aab814d7fcf267de3bd8469b5bd4b4 100644 (file)
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
 subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
+subdir('test_copy_callbacks')
 subdir('test_ddl_deparse')
 subdir('test_extensions')
 subdir('test_ginpostinglist')
diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore
new file mode 100644 (file)
index 0000000..5dcb3ff
--- /dev/null
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile
new file mode 100644 (file)
index 0000000..82e8901
--- /dev/null
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_callbacks/Makefile
+
+MODULE_big = test_copy_callbacks
+OBJS = \
+       $(WIN32RES) \
+       test_copy_callbacks.o
+PGFILEDESC = "test_copy_callbacks - test COPY callbacks"
+
+EXTENSION = test_copy_callbacks
+DATA = test_copy_callbacks--1.0.sql
+
+REGRESS = test_copy_callbacks
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_callbacks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out
new file mode 100644 (file)
index 0000000..93ebeef
--- /dev/null
@@ -0,0 +1,13 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
+NOTICE:  COPY TO callback called with data "1  2       3" and length 5
+NOTICE:  COPY TO callback called with data "12 34      56" and length 8
+NOTICE:  COPY TO callback called with data "123        456     789" and length 11
+NOTICE:  COPY TO callback has processed 3 rows
+ test_copy_to_callback 
+-----------------------
+(1 row)
+
diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build
new file mode 100644 (file)
index 0000000..43eca8e
--- /dev/null
@@ -0,0 +1,34 @@
+# FIXME: prevent install during main install, but not during test :/
+
+test_copy_callbacks_sources = files(
+  'test_copy_callbacks.c',
+)
+
+if host_system == 'windows'
+  test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_callbacks',
+    '--FILEDESC', 'test_copy_callbacks - test COPY callbacks',])
+endif
+
+test_copy_callbacks = shared_module('test_copy_callbacks',
+  test_copy_callbacks_sources,
+  kwargs: pg_mod_args,
+)
+testprep_targets += test_copy_callbacks
+
+install_data(
+  'test_copy_callbacks.control',
+  'test_copy_callbacks--1.0.sql',
+  kwargs: contrib_data_args,
+)
+
+tests += {
+  'name': 'test_copy_callbacks',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_callbacks',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql
new file mode 100644 (file)
index 0000000..2deffba
--- /dev/null
@@ -0,0 +1,4 @@
+CREATE EXTENSION test_copy_callbacks;
+CREATE TABLE public.test (a INT, b INT, c INT);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql
new file mode 100644 (file)
index 0000000..215cf3f
--- /dev/null
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
+
+CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
+       RETURNS pg_catalog.void
+       AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c
new file mode 100644 (file)
index 0000000..ecdbe4e
--- /dev/null
@@ -0,0 +1,51 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_callbacks.c
+ *             Code for testing COPY callbacks.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *             src/test/modules/test_copy_callbacks/test_copy_callbacks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "commands/copy.h"
+#include "fmgr.h"
+#include "utils/rel.h"
+
+PG_MODULE_MAGIC;
+
+static void
+to_cb(void *data, int len)
+{
+       ereport(NOTICE,
+                       (errmsg("COPY TO callback called with data \"%s\" and length %d",
+                                       (char *) data, len)));
+}
+
+PG_FUNCTION_INFO_V1(test_copy_to_callback);
+Datum
+test_copy_to_callback(PG_FUNCTION_ARGS)
+{
+       Relation        rel = table_open(PG_GETARG_OID(0), AccessShareLock);
+       CopyToState cstate;
+       int64           processed;
+
+       cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
+                                                to_cb, NIL, NIL);
+       processed = DoCopyTo(cstate);
+       EndCopyTo(cstate);
+
+       ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows",
+                                                       (long long) processed)));
+
+       table_close(rel, NoLock);
+
+       PG_RETURN_VOID();
+}
diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control
new file mode 100644 (file)
index 0000000..b7ce3f1
--- /dev/null
@@ -0,0 +1,4 @@
+comment = 'Test code for COPY callbacks'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_callbacks'
+relocatable = true
index 97c9bc1861575959df3c560fd5aba77b546f02bd..d9b839c9799ab7f076fed36aa8d1b83d786b3999 100644 (file)
@@ -3177,6 +3177,7 @@ compare_context
 config_var_value
 contain_aggs_of_level_context
 convert_testexpr_context
+copy_data_dest_cb
 copy_data_source_cb
 core_YYSTYPE
 core_yy_extra_type