summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas2016-05-09 15:48:11 +0000
committerRobert Haas2016-09-20 16:39:57 +0000
commitdfd040ae58eda766c7e34059f84e42dd3112cc1f (patch)
tree8886c2a2f7e9126baf914190c212f6309d2e6ff7
parent771673b608b1ed259a175ecf03e826aaa74808ae (diff)
Lightweight framework for waiting for events.async
-rw-r--r--src/backend/executor/Makefile4
-rw-r--r--src/backend/executor/execAsync.c256
-rw-r--r--src/backend/executor/execProcnode.c82
-rw-r--r--src/include/executor/execAsync.h23
-rw-r--r--src/include/executor/executor.h2
-rw-r--r--src/include/nodes/execnodes.h10
6 files changed, 352 insertions, 25 deletions
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 51edd4c5e7..0675b01356 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -12,8 +12,8 @@ subdir = src/backend/executor
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
- execMain.o execParallel.o execProcnode.o execQual.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execGrouping.o execIndexing.o \
+ execJunk.o execMain.o execParallel.o execProcnode.o execQual.o \
execScan.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..20601fa04e
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,256 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * This file contains routines that are intended to asynchronous
+ * execution; that is, suspending an executor node until some external
+ * event occurs, or until one of its child nodes produces a tuple.
+ * This allows the executor to avoid blocking on a single external event,
+ * such as a file descriptor waiting on I/O, or a parallel worker which
+ * must complete work elsewhere in the plan tree, when there might at the
+ * same time be useful computation that could be accomplished in some
+ * other part of the plan tree.
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execParallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/executor.h"
+#include "storage/latch.h"
+
+#define EVENT_BUFFER_SIZE 16
+
+static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit);
+
+void
+ExecAsyncWaitForNode(PlanState *planstate)
+{
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ PlanState *callbacks[EVENT_BUFFER_SIZE];
+ int ncallbacks = 0;
+ EState *estate = planstate->state;
+
+ while (!planstate->result_ready)
+ {
+ bool reinit = (estate->es_wait_event_set == NULL);
+ int n;
+ int noccurred;
+
+ if (reinit)
+ {
+ /*
+ * Allow for a few extra events without reinitializing. It
+ * doesn't seem worth the complexity of doing anything very
+ * aggressive here, because plans that depend on massive numbers
+ * of external FDs are likely to run afoul of kernel limits anyway.
+ */
+ estate->es_max_async_events = estate->es_total_async_events + 16;
+ estate->es_wait_event_set =
+ CreateWaitEventSet(estate->es_query_cxt,
+ estate->es_max_async_events);
+ }
+
+ /* Give each waiting node a chance to add or modify events. */
+ for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+ ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit);
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(estate->es_wait_event_set, -1,
+ occurred_event, EVENT_BUFFER_SIZE);
+ Assert(noccurred > 0);
+
+ /*
+ * Loop over the occurred events and make a list of nodes that need
+ * a callback. The waiting nodes should have registered their wait
+ * events with user_data pointing back to the node.
+ */
+ for (n = 0; n < noccurred; ++n)
+ {
+ WaitEvent *w = &occurred_event[n];
+ PlanState *ps = w->user_data;
+
+ callbacks[ncallbacks++] = ps;
+ }
+
+ /*
+ * Initially, this loop will call the node-type-specific function for
+ * each node for which an event occurred. If any of those nodes
+ * produce a result, its parent enters the set of nodes that are
+ * pending for a callback. In this way, when a result becomes
+ * available in a leaf of the plan tree, it can bubble upwards towards
+ * the root as far as necessary.
+ */
+ while (ncallbacks > 0)
+ {
+ int i,
+ j;
+
+ /* Loop over all callbacks. */
+ for (i = 0; i < ncallbacks; ++i)
+ {
+ /* Skip if NULL. */
+ if (callbacks[i] == NULL)
+ continue;
+
+ /*
+ * Remove any duplicates. O(n) may not seem good, but it
+ * should hopefully be OK as long as EVENT_BUFFER_SIZE is
+ * not too large.
+ */
+ for (j = i + 1; j < ncallbacks; ++j)
+ if (callbacks[i] == callbacks[j])
+ callbacks[j] = NULL;
+
+ /* Dispatch to node-type-specific code. */
+ ExecDispatchNode(callbacks[i]);
+
+ /*
+ * If there's now a tuple ready, we must dispatch to the
+ * parent node; otherwise, there's nothing more to do.
+ */
+ if (callbacks[i]->result_ready)
+ callbacks[i] = callbacks[i]->parent;
+ else
+ callbacks[i] = NULL;
+ }
+
+ /* Squeeze out NULLs. */
+ for (i = 0, j = 0; j < ncallbacks; ++j)
+ if (callbacks[j] != NULL)
+ callbacks[i++] = callbacks[j];
+ ncallbacks = i;
+ }
+ }
+}
+
+/*
+ * An executor node should call this function to signal that it needs to wait
+ * on one more or events that can be registered on a WaitEventSet. nevents
+ * should be the maximum number of events that it will wish to register.
+ * reinit should be true if the node can't reuse the WaitEventSet it most
+ * recently initialized, for example because it needs to drop a wait event
+ * from the set.
+ */
+void
+ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit)
+{
+ EState *estate = planstate->state;
+
+ Assert(nevents > 0); /* otherwise, use ExecAsyncDoesNotNeedWait */
+
+ /*
+ * If this node is not already present in the array of waiting nodes,
+ * then add it. If that array hasn't been allocated or is full, this may
+ * require (re)allocating it.
+ */
+ if (planstate->n_async_events == 0)
+ {
+ if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes)
+ {
+ int newmax;
+
+ if (estate->es_max_waiting_nodes == 0)
+ {
+ newmax = 16;
+ estate->es_waiting_nodes =
+ MemoryContextAlloc(estate->es_query_cxt, newmax);
+ }
+ else
+ {
+ newmax = estate->es_max_waiting_nodes * 2;
+ estate->es_waiting_nodes =
+ repalloc(estate->es_waiting_nodes,
+ newmax * sizeof(PlanState *));
+ }
+ estate->es_max_waiting_nodes = newmax;
+ }
+ estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate;
+ }
+
+ /* Adjust per-node and per-estate totals. */
+ estate->es_total_async_events -= planstate->n_async_events;
+ planstate->n_async_events = nevents;
+ estate->es_total_async_events += planstate->n_async_events;
+
+ /*
+ * If a WaitEventSet has already been created, we need to discard it and
+ * start again if the user passed reinit = true, or if the total number of
+ * required events exceeds the supported number.
+ */
+ if (estate->es_wait_event_set != NULL && (reinit ||
+ estate->es_total_async_events > estate->es_max_async_events))
+ {
+ FreeWaitEventSet(estate->es_wait_event_set);
+ estate->es_wait_event_set = NULL;
+ }
+}
+
+/*
+ * If an executor node no longer needs to wait, it should call this function
+ * to report that fact.
+ */
+void
+ExecAsyncDoesNotNeedWait(PlanState *planstate)
+{
+ int n;
+ EState *estate = planstate->state;
+
+ if (planstate->n_async_events <= 0)
+ return;
+
+ /*
+ * Remove the node from the list of waiting nodes. (Is a linear search
+ * going to be a problem here? I think probably not.)
+ */
+ for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+ {
+ if (estate->es_waiting_nodes[n] == planstate)
+ {
+ estate->es_waiting_nodes[n] =
+ estate->es_waiting_nodes[--estate->es_num_waiting_nodes];
+ break;
+ }
+ }
+
+ /* We should always find ourselves in the array. */
+ Assert(n < estate->es_num_waiting_nodes);
+
+ /* We no longer need any asynchronous events. */
+ estate->es_total_async_events -= planstate->n_async_events;
+ planstate->n_async_events = 0;
+
+ /*
+ * The next wait will need to rebuild the WaitEventSet, because whatever
+ * events we registered are gone now. It's probably OK that this code
+ * assumes we actually did register some events at one point, because we
+ * needed to wait at some point and we don't any more.
+ */
+ if (estate->es_wait_event_set != NULL)
+ {
+ FreeWaitEventSet(estate->es_wait_event_set);
+ estate->es_wait_event_set = NULL;
+ }
+}
+
+/*
+ * Give per-nodetype function a chance to register wait events.
+ */
+static void
+ExecAsyncConfigureWait(PlanState *planstate, bool reinit)
+{
+ switch (nodeTag(planstate))
+ {
+ /* XXX: Add calls to per-nodetype handlers here. */
+ default:
+ elog(ERROR, "unexpected node type: %d", nodeTag(planstate));
+ }
+}
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 3f2ebff173..b7ac08eead 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -77,6 +77,7 @@
*/
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
@@ -368,24 +369,14 @@ ExecInitNode(Plan *node, EState *estate, PlanState *parent, int eflags)
/* ----------------------------------------------------------------
- * ExecProcNode
+ * ExecDispatchNode
*
- * Execute the given node to return a(nother) tuple.
+ * Invoke the given node's dispatch function.
* ----------------------------------------------------------------
*/
-TupleTableSlot *
-ExecProcNode(PlanState *node)
+void
+ExecDispatchNode(PlanState *node)
{
- TupleTableSlot *result;
-
- CHECK_FOR_INTERRUPTS();
-
- /* mark any previous result as having been consumed */
- node->result_ready = false;
-
- if (node->chgParam != NULL) /* something changed */
- ExecReScan(node); /* let ReScan handle this */
-
if (node->instrument)
InstrStartNode(node->instrument);
@@ -539,22 +530,67 @@ ExecProcNode(PlanState *node)
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
- result = NULL;
break;
}
- /* We don't support asynchronous execution yet. */
- Assert(node->result_ready);
+ if (node->instrument)
+ {
+ double nTuples = 0.0;
- /* Result should be a TupleTableSlot, unless it's NULL. */
- Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+ if (node->result_ready && node->result != NULL &&
+ IsA(node->result, TupleTableSlot))
+ nTuples = 1.0;
- result = (TupleTableSlot *) node->result;
+ InstrStopNode(node->instrument, nTuples);
+ }
+}
- if (node->instrument)
- InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
- return result;
+/* ----------------------------------------------------------------
+ * ExecExecuteNode
+ *
+ * Request the next tuple from the given node. Note that
+ * if the node supports asynchrony, result_ready may not be
+ * set on return (use ExecProcNode if you need that, or call
+ * ExecAsyncWaitForNode).
+ * ----------------------------------------------------------------
+ */
+void
+ExecExecuteNode(PlanState *node)
+{
+ node->result_ready = false;
+ ExecDispatchNode(node);
+}
+
+
+/* ----------------------------------------------------------------
+ * ExecProcNode
+ *
+ * Get the next tuple from the given node. If the node is
+ * asynchronous, wait for a tuple to be ready before
+ * returning.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecProcNode(PlanState *node)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ /* mark any previous result as having been consumed */
+ node->result_ready = false;
+
+ if (node->chgParam != NULL) /* something changed */
+ ExecReScan(node); /* let ReScan handle this */
+
+ ExecDispatchNode(node);
+
+ if (!node->result_ready)
+ ExecAsyncWaitForNode(node);
+
+ /* Result should be a TupleTableSlot, unless it's NULL. */
+ Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+
+ return (TupleTableSlot *) node->result;
}
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..38b37a1d2a
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *--------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncWaitForNode(PlanState *planstate);
+extern void ExecAsyncNeedsWait(PlanState *planstate, int nevents,
+ bool reinit);
+extern void ExecAsyncDoesNotNeedWait(PlanState *planstate);
+
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 1eb09d80a0..7abc361fe5 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -223,6 +223,8 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
*/
extern PlanState *ExecInitNode(Plan *node, EState *estate, PlanState *parent,
int eflags);
+extern void ExecDispatchNode(PlanState *node);
+extern void ExecExecuteNode(PlanState *node);
extern TupleTableSlot *ExecProcNode(PlanState *node);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b14aa7a306..d0c7cfc300 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -383,6 +383,14 @@ typedef struct EState
ParamListInfo es_param_list_info; /* values of external params */
ParamExecData *es_param_exec_vals; /* values of internal params */
+ /* Asynchronous execution support */
+ struct PlanState **es_waiting_nodes; /* array of waiting nodes */
+ int es_num_waiting_nodes; /* # of waiters in array */
+ int es_max_waiting_nodes; /* # of allocated entries */
+ int es_total_async_events; /* total of per-node n_async_events */
+ int es_max_async_events; /* # supported by event set */
+ struct WaitEventSet *es_wait_event_set;
+
/* Other working state: */
MemoryContext es_query_cxt; /* per-query context in which EState lives */
@@ -1035,6 +1043,8 @@ typedef struct PlanState
bool result_ready; /* true if result is ready */
Node *result; /* result, most often TupleTableSlot */
+ int n_async_events; /* # of async events we want to register */
+
Instrumentation *instrument; /* Optional runtime stats for this node */
WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */