#include "utils/dsa.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
+#include "pgstat.h"
/*
* Magic numbers for parallel executor communication. We use constants
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
int instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
+ char *query_string;
+ int query_len;
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
* for the various things we need to store.
*/
+ /* Estimate space for query text. */
+ query_len = strlen(estate->es_sourceText);
+ shm_toc_estimate_chunk(&pcxt->estimator, query_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for serialized PlannedStmt. */
pstmt_len = strlen(pstmt_data) + 1;
shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
* asked for has been allocated or initialized yet, though, so do that.
*/
+ /* Store query string */
+ query_string = shm_toc_allocate(pcxt->toc, query_len);
+ memcpy(query_string, estate->es_sourceText, query_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
+
/* Store serialized PlannedStmt. */
pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
memcpy(pstmt_space, pstmt_data, pstmt_len);
char *paramspace;
PlannedStmt *pstmt;
ParamListInfo paramLI;
+ char *queryString;
+
+ /* Get the query string from shared memory */
+ queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT);
/* Reconstruct leader-supplied PlannedStmt. */
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
* revising this someday.
*/
return CreateQueryDesc(pstmt,
- "<parallel query>",
+ queryString,
GetActiveSnapshot(), InvalidSnapshot,
receiver, paramLI, instrument_options);
}
instrument_options = instrumentation->instrument_options;
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
+ /* Setting debug_query_string for individual workers */
+ debug_query_string = queryDesc->sourceText;
+
+ /* Report workers' query for monitoring purposes */
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
/* Prepare to track buffer usage during query execution. */
InstrStartParallelQuery();