Add a new wait state and use it when sending data in the apply worker.
authorAmit Kapila <akapila@postgresql.org>
Thu, 16 Feb 2023 02:16:31 +0000 (07:46 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 16 Feb 2023 02:16:31 +0000 (07:46 +0530)
d9d7fe68d3 made use of an existing wait event when sending data from the
apply worker, but we should have invented a new wait event since this is a
new place to wait.

This patch corrects the mistake by using a new wait event
"LogicalApplySendData".

Author: Hou Zhijie
Reviewed-by: Peter Smith
Discussion: https://postgr.es/m/CA+TgmobWzbr9H3yN3dLVckviEZKemPwd+XyCFKEgyZQZhgP66Q@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/replication/logical/applyparallelworker.c
src/backend/utils/activity/wait_event.c
src/include/utils/wait_event.h

index dca50707ad4c97fd3ec2590e72f612a5a2de04c5..b0b997f092fb2f0d171aab65626692f897d0d61f 100644 (file)
@@ -1740,6 +1740,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting for other Parallel Hash participants to finish inserting
        tuples into new buckets.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalApplySendData</literal></entry>
+      <entry>Waiting for a logical replication leader apply process to send
+       data to a parallel apply process.</entry>
+     </row>
      <row>
       <entry><literal>LogicalParallelApplyStateChange</literal></entry>
       <entry>Waiting for a logical replication parallel apply process to change
index da437e0bc34e0c3270da0327a314a6bad787520a..45186837795e63f1c53926dedc571cdf02bbdad4 100644 (file)
@@ -1181,7 +1181,8 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
        /* Wait before retrying. */
        rc = WaitLatch(MyLatch,
                       WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
-                      SHM_SEND_RETRY_INTERVAL_MS, WAIT_EVENT_MQ_SEND);
+                      SHM_SEND_RETRY_INTERVAL_MS,
+                      WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
 
        if (rc & WL_LATCH_SET)
        {
index 6e4599278c367cc8c6aae066642cc3616b6f573e..cb99cc633912a8a0ece53043099074dfe64cdff3 100644 (file)
@@ -391,6 +391,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
        case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
            event_name = "HashGrowBucketsReinsert";
            break;
+       case WAIT_EVENT_LOGICAL_APPLY_SEND_DATA:
+           event_name = "LogicalApplySendData";
+           break;
        case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE:
            event_name = "LogicalParallelApplyStateChange";
            break;
index 6cacd6edaf0ad4d9ff2fa6263ff5524e6b83ac64..9ab23e1c4ab64ef84ac627b22db20d6b3d953836 100644 (file)
@@ -106,6 +106,7 @@ typedef enum
    WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
    WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
    WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
+   WAIT_EVENT_LOGICAL_APPLY_SEND_DATA,
    WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE,
    WAIT_EVENT_LOGICAL_SYNC_DATA,
    WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,