diff options
| author | Michael Paquier | 2021-03-09 05:21:03 +0000 |
|---|---|---|
| committer | Michael Paquier | 2021-03-09 05:21:03 +0000 |
| commit | 9d2d45700928d49212fb7ed140feeaebe3a6014f (patch) | |
| tree | 88c79348c8b63167cae1cf4636185cc48e24785b /src/backend/commands | |
| parent | f9264d1524baa19e4a0528f033681ef16f61b137 (diff) | |
Add support for more progress reporting in COPY
The command (TO or FROM), its type (file, pipe, program or callback),
and the number of tuples excluded by a WHERE clause in COPY FROM are
added to the progress reporting already available.
The column "lines_processed" is renamed to "tuples_processed" to
disambiguate the meaning of this column in the cases of CSV and BINARY
COPY and to be more consistent with the other catalog progress views.
Bump catalog version, again.
Author: Matthias van de Meent
Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef
Šimánek, Tomas Vondra
Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com
Diffstat (limited to 'src/backend/commands')
| -rw-r--r-- | src/backend/commands/copyfrom.c | 34 | ||||
| -rw-r--r-- | src/backend/commands/copyto.c | 28 |
2 files changed, 54 insertions, 8 deletions
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f05e2d23476..2ed696d429b 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate) BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; + int64 processed = 0; + int64 excluded = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; @@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate) econtext->ecxt_scantuple = myslot; /* Skip items that don't match COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) + { + /* + * Report that this tuple was filtered out by the WHERE + * clause. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, + ++excluded); continue; + } } /* Determine the partition to insert the tuple into */ @@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate) /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. Update + * for counting tuples inserted by an INSERT command. Update * progress of the COPY command as well. */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++processed); } } @@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate, ExprState **defexprs; MemoryContext oldcontext; bool volatile_defexprs; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE, + PROGRESS_COPY_BYTES_TOTAL + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_FROM, + 0, + 0 + }; /* Allocate workspace and zero all fields */ cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); @@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; cstate->copy_src = COPY_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); @@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate, if (cstate->is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, @@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate, { struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) { @@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); - pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); + progress_vals[2] = st.st_size; } } + pgstat_progress_update_multi_param(3, progress_cols, progress_vals); + if (cstate->opts.binary) { /* Read and verify binary header */ diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 46155015cfd..7257a54e935 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate, TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_TO, + 0 + }; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { @@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate, if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; @@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate, if (is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, @@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate, mode_t oumask; /* Pre-existing umask value */ struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; + /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... @@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate, /* initialize progress */ pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + pgstat_progress_update_multi_param(2, progress_cols, progress_vals); + cstate->bytes_processed = 0; MemoryContextSwitchTo(oldcontext); @@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + /* + * Increment the number of processed tuples, and report the + * progress. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); + /* Increment the number of processed tuples, and report the progress */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++myState->processed); return true; } |
