Pass the source text for a parallel query to the workers.
authorRobert Haas <rhaas@postgresql.org>
Wed, 22 Feb 2017 06:45:17 +0000 (12:15 +0530)
committerRobert Haas <rhaas@postgresql.org>
Wed, 22 Feb 2017 06:48:29 +0000 (12:18 +0530)
With this change, you can see the query that a parallel worker is
executing in pg_stat_activity, and if the worker crashes you can
see what query it was executing when it crashed.

Rafia Sabih, reviewed by Kuntal Ghosh and Amit Kapila and slightly
revised by me.

src/backend/executor/execMain.c
src/backend/executor/execParallel.c
src/backend/executor/execUtils.c
src/include/nodes/execnodes.h

index a66639178a5b5f4d9aa86de423a9bbf256e93415..3f76a407d7a2493a05c219c7bebf819be4190a9c 100644 (file)
@@ -190,6 +190,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
        estate->es_param_exec_vals = (ParamExecData *)
            palloc0(queryDesc->plannedstmt->nParamExec * sizeof(ParamExecData));
 
+   estate->es_sourceText = queryDesc->sourceText;
+
    /*
     * If non-read-only query, set the command ID to mark output tuples with
     */
index 646df087f9fcbaa48ff8fbe677f544e5877c6519..de0e2bafe605b9117f57b66d5e304599d8b9ea6c 100644 (file)
@@ -39,6 +39,7 @@
 #include "utils/dsa.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
+#include "pgstat.h"
 
 /*
  * Magic numbers for parallel executor communication.  We use constants
@@ -51,6 +52,7 @@
 #define PARALLEL_KEY_TUPLE_QUEUE       UINT64CONST(0xE000000000000004)
 #define PARALLEL_KEY_INSTRUMENTATION   UINT64CONST(0xE000000000000005)
 #define PARALLEL_KEY_DSA               UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_QUERY_TEXT        UINT64CONST(0xE000000000000007)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE      65536
 
@@ -368,6 +370,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
    int         instrumentation_len = 0;
    int         instrument_offset = 0;
    Size        dsa_minsize = dsa_minimum_size();
+   char       *query_string;
+   int         query_len;
 
    /* Allocate object for return value. */
    pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -387,6 +391,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
     * for the various things we need to store.
     */
 
+   /* Estimate space for query text. */
+   query_len = strlen(estate->es_sourceText);
+   shm_toc_estimate_chunk(&pcxt->estimator, query_len);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+
    /* Estimate space for serialized PlannedStmt. */
    pstmt_len = strlen(pstmt_data) + 1;
    shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
@@ -451,6 +460,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
     * asked for has been allocated or initialized yet, though, so do that.
     */
 
+   /* Store query string */
+   query_string = shm_toc_allocate(pcxt->toc, query_len);
+   memcpy(query_string, estate->es_sourceText, query_len);
+   shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
+
    /* Store serialized PlannedStmt. */
    pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
    memcpy(pstmt_space, pstmt_data, pstmt_len);
@@ -661,6 +675,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
    char       *paramspace;
    PlannedStmt *pstmt;
    ParamListInfo paramLI;
+   char       *queryString;
+
+   /* Get the query string from shared memory */
+   queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
 
    /* Reconstruct leader-supplied PlannedStmt. */
    pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
@@ -679,7 +697,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
     * revising this someday.
     */
    return CreateQueryDesc(pstmt,
-                          "<parallel query>",
+                          queryString,
                           GetActiveSnapshot(), InvalidSnapshot,
                           receiver, paramLI, instrument_options);
 }
@@ -799,6 +817,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        instrument_options = instrumentation->instrument_options;
    queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
 
+   /* Setting debug_query_string for individual workers */
+   debug_query_string = queryDesc->sourceText;
+
+   /* Report workers' query for monitoring purposes */
+   pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
    /* Prepare to track buffer usage during query execution. */
    InstrStartParallelQuery();
 
index e49feff6c0fac17794a094e3b73adc473a1cae20..3d6a3801c060179908c6d2069558db0344d4e105 100644 (file)
@@ -139,6 +139,7 @@ CreateExecutorState(void)
    estate->es_epqTuple = NULL;
    estate->es_epqTupleSet = NULL;
    estate->es_epqScanDone = NULL;
+   estate->es_sourceText = NULL;
 
    /*
     * Return the executor state structure
index 1c1cb80a63648e1332236e0d83ae1524c34ec90d..6332ea0620c928c32414f7f487fc90a6dbf6d51b 100644 (file)
@@ -371,6 +371,7 @@ typedef struct EState
    Snapshot    es_crosscheck_snapshot; /* crosscheck time qual for RI */
    List       *es_range_table; /* List of RangeTblEntry */
    PlannedStmt *es_plannedstmt;    /* link to top of plan tree */
+   const char *es_sourceText;  /* Source text from QueryDesc */
 
    JunkFilter *es_junkFilter;  /* top-level junk filter, if any */