Add new parallel message type to progress reporting.
authorMasahiko Sawada <msawada@postgresql.org>
Tue, 11 Jul 2023 03:33:54 +0000 (12:33 +0900)
committerMasahiko Sawada <msawada@postgresql.org>
Tue, 11 Jul 2023 03:33:54 +0000 (12:33 +0900)
This commit adds a new type of parallel message 'P' to allow a
parallel worker to poke at a leader to update the progress.

Currently it supports only incremental progress reporting but it's
possible to allow for supporting of other backend progress APIs in the
future.

There are no users of this new message type as of this commit. That
will follow in future commits.

Idea from Andres Freund.

Author: Sami Imseih
Reviewed by: Michael Paquier, Masahiko Sawada
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com

src/backend/access/transam/parallel.c
src/backend/utils/activity/backend_progress.c
src/include/utils/backend_progress.h

index 2b8bc2f58dd8e152f6085c92e51e415aa9cf800d..2bd04bd177396c9561b2e0cf2d569ce790befaab 100644 (file)
@@ -24,6 +24,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
@@ -1199,6 +1200,23 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
                break;
            }
 
+       case 'P':               /* Parallel progress reporting */
+           {
+               /*
+                * Only incremental progress reporting is currently supported.
+                * However, it's possible to add more fields to the message to
+                * allow for handling of other backend progress APIs.
+                */
+               int         index = pq_getmsgint(msg, 4);
+               int64       incr = pq_getmsgint64(msg);
+
+               pq_getmsgend(msg);
+
+               pgstat_progress_incr_param(index, incr);
+
+               break;
+           }
+
        case 'X':               /* Terminate, indicating clean exit */
            {
                shm_mq_detach(pcxt->worker[i].error_mqh);
index fb48eafef9a545daf38fcc65ef3df4b91e2eb901..67447ef03ab574a6895fa21051323619d085ba05 100644 (file)
@@ -10,6 +10,8 @@
  */
 #include "postgres.h"
 
+#include "access/parallel.h"
+#include "libpq/pqformat.h"
 #include "port/atomics.h"      /* for memory barriers */
 #include "utils/backend_progress.h"
 #include "utils/backend_status.h"
@@ -79,6 +81,36 @@ pgstat_progress_incr_param(int index, int64 incr)
    PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*-----------
+ * pgstat_progress_parallel_incr_param() -
+ *
+ * A variant of pgstat_progress_incr_param to allow a worker to poke at
+ * a leader to do an incremental progress update.
+ *-----------
+ */
+void
+pgstat_progress_parallel_incr_param(int index, int64 incr)
+{
+   /*
+    * Parallel workers notify a leader through a 'P' protocol message to
+    * update progress, passing the progress index and incremented value.
+    * Leaders can just call pgstat_progress_incr_param directly.
+    */
+   if (IsParallelWorker())
+   {
+       static StringInfoData progress_message;
+
+       initStringInfo(&progress_message);
+
+       pq_beginmessage(&progress_message, 'P');
+       pq_sendint32(&progress_message, index);
+       pq_sendint64(&progress_message, incr);
+       pq_endmessage(&progress_message);
+   }
+   else
+       pgstat_progress_incr_param(index, incr);
+}
+
 /*-----------
  * pgstat_progress_update_multi_param() -
  *
index a84752ade99a1bb74ad12690926311e09ae1991c..70dea55fc003ec6c377c110e991981992f69eefb 100644 (file)
@@ -37,6 +37,7 @@ extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
                                          Oid relid);
 extern void pgstat_progress_update_param(int index, int64 val);
 extern void pgstat_progress_incr_param(int index, int64 incr);
+extern void pgstat_progress_parallel_incr_param(int index, int64 incr);
 extern void pgstat_progress_update_multi_param(int nparam, const int *index,
                                               const int64 *val);
 extern void pgstat_progress_end_command(void);