<structfield>relid</structfield> <type>oid</type>
</para>
<para>
- OID of the table on which the <command>COPY</command> command is executed.
- It is set to 0 if copying from a <command>SELECT</command> query.
+ OID of the table on which the <command>COPY</command> command is
+ executed. It is set to <literal>0</literal> if copying from a
+ <command>SELECT</command> query.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>command</structfield> <type>text</type>
+ </para>
+ <para>
+ The command that is running: <literal>COPY FROM</literal>, or
+ <literal>COPY TO</literal>.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>type</structfield> <type>text</type>
+ </para>
+ <para>
+ The io type that the data is read from or written to:
+ <literal>FILE</literal>, <literal>PROGRAM</literal>,
+ <literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
+ <command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
+ (used for example during the initial table synchronization in
+ logical replication).
</para></entry>
</row>
</para>
<para>
Size of source file for <command>COPY FROM</command> command in bytes.
- It is set to 0 if not available.
+ It is set to <literal>0</literal> if not available.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>tuples_processed</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of tuples already processed by <command>COPY</command> command.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
- <structfield>lines_processed</structfield> <type>bigint</type>
+ <structfield>tuples_excluded</structfield> <type>bigint</type>
</para>
<para>
- Number of lines already processed by <command>COPY</command> command.
+ Number of tuples not processed because they were excluded by the
+ <command>WHERE</command> clause of the <command>COPY</command> command.
</para></entry>
</row>
</tbody>
SELECT
S.pid AS pid, S.datid AS datid, D.datname AS datname,
S.relid AS relid,
+ CASE S.param5 WHEN 1 THEN 'COPY FROM'
+ WHEN 2 THEN 'COPY TO'
+ END AS command,
+ CASE S.param6 WHEN 1 THEN 'FILE'
+ WHEN 2 THEN 'PROGRAM'
+ WHEN 3 THEN 'PIPE'
+ WHEN 4 THEN 'CALLBACK'
+ END AS "type",
S.param1 AS bytes_processed,
S.param2 AS bytes_total,
- S.param3 AS lines_processed
+ S.param3 AS tuples_processed,
+ S.param4 AS tuples_excluded
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
- uint64 processed = 0;
+ int64 processed = 0;
+ int64 excluded = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
+ {
+ /*
+ * Report that this tuple was filtered out by the WHERE
+ * clause.
+ */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
+ ++excluded);
continue;
+ }
}
/* Determine the partition to insert the tuple into */
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
- * for counting tuples inserted by an INSERT command. Update
+ * for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/
- pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++processed);
}
}
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
+ const int progress_cols[] = {
+ PROGRESS_COPY_COMMAND,
+ PROGRESS_COPY_TYPE,
+ PROGRESS_COPY_BYTES_TOTAL
+ };
+ int64 progress_vals[] = {
+ PROGRESS_COPY_COMMAND_FROM,
+ 0,
+ 0
+ };
/* Allocate workspace and zero all fields */
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
if (data_source_cb)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
cstate->copy_src = COPY_CALLBACK;
cstate->data_source_cb = data_source_cb;
}
else if (pipe)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
if (cstate->is_program)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
{
struct stat st;
+ progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
{
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
- pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
+ progress_vals[2] = st.st_size;
}
}
+ pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
+
if (cstate->opts.binary)
{
/* Read and verify binary header */
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
+ const int progress_cols[] = {
+ PROGRESS_COPY_COMMAND,
+ PROGRESS_COPY_TYPE
+ };
+ int64 progress_vals[] = {
+ PROGRESS_COPY_COMMAND_TO,
+ 0
+ };
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
if (pipe)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
+
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
if (is_program)
{
+ progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
if (cstate->copy_file == NULL)
ereport(ERROR,
mode_t oumask; /* Pre-existing umask value */
struct stat st;
+ progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
+
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
+
cstate->bytes_processed = 0;
MemoryContextSwitchTo(oldcontext);
/* Format and send the data */
CopyOneRowTo(cstate, slot);
- /* Increment amount of processed tuples and update the progress */
- pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+ /*
+ * Increment the number of processed tuples, and report the
+ * progress.
+ */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++processed);
}
ExecDropSingleTupleTableSlot(slot);
/* Send the data */
CopyOneRowTo(cstate, slot);
- /* Increment amount of processed tuples and update the progress */
- pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
+ /* Increment the number of processed tuples, and report the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++myState->processed);
return true;
}
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202103091
+#define CATALOG_VERSION_NO 202103092
#endif
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
-/* Commands of PROGRESS_COPY */
+/* Progress parameters for PROGRESS_COPY */
#define PROGRESS_COPY_BYTES_PROCESSED 0
#define PROGRESS_COPY_BYTES_TOTAL 1
-#define PROGRESS_COPY_LINES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_EXCLUDED 3
+#define PROGRESS_COPY_COMMAND 4
+#define PROGRESS_COPY_TYPE 5
+
+/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
+#define PROGRESS_COPY_COMMAND_FROM 1
+#define PROGRESS_COPY_COMMAND_TO 2
+
+/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */
+#define PROGRESS_COPY_TYPE_FILE 1
+#define PROGRESS_COPY_TYPE_PROGRAM 2
+#define PROGRESS_COPY_TYPE_PIPE 3
+#define PROGRESS_COPY_TYPE_CALLBACK 4
#endif
s.datid,
d.datname,
s.relid,
+ CASE s.param5
+ WHEN 1 THEN 'COPY FROM'::text
+ WHEN 2 THEN 'COPY TO'::text
+ ELSE NULL::text
+ END AS command,
+ CASE s.param6
+ WHEN 1 THEN 'FILE'::text
+ WHEN 2 THEN 'PROGRAM'::text
+ WHEN 3 THEN 'PIPE'::text
+ WHEN 4 THEN 'CALLBACK'::text
+ ELSE NULL::text
+ END AS type,
s.param1 AS bytes_processed,
s.param2 AS bytes_total,
- s.param3 AS lines_processed
+ s.param3 AS tuples_processed,
+ s.param4 AS tuples_excluded
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,