--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * This file contains routines that are intended to asynchronous
+ * execution; that is, suspending an executor node until some external
+ * event occurs, or until one of its child nodes produces a tuple.
+ * This allows the executor to avoid blocking on a single external event,
+ * such as a file descriptor waiting on I/O, or a parallel worker which
+ * must complete work elsewhere in the plan tree, when there might at the
+ * same time be useful computation that could be accomplished in some
+ * other part of the plan tree.
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execParallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/executor.h"
+#include "storage/latch.h"
+
+#define EVENT_BUFFER_SIZE 16
+
+static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit);
+
+void
+ExecAsyncWaitForNode(PlanState *planstate)
+{
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ PlanState *callbacks[EVENT_BUFFER_SIZE];
+ int ncallbacks = 0;
+ EState *estate = planstate->state;
+
+ while (!planstate->result_ready)
+ {
+ bool reinit = (estate->es_wait_event_set == NULL);
+ int n;
+ int noccurred;
+
+ if (reinit)
+ {
+ /*
+ * Allow for a few extra events without reinitializing. It
+ * doesn't seem worth the complexity of doing anything very
+ * aggressive here, because plans that depend on massive numbers
+ * of external FDs are likely to run afoul of kernel limits anyway.
+ */
+ estate->es_max_async_events = estate->es_total_async_events + 16;
+ estate->es_wait_event_set =
+ CreateWaitEventSet(estate->es_query_cxt,
+ estate->es_max_async_events);
+ }
+
+ /* Give each waiting node a chance to add or modify events. */
+ for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+ ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit);
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(estate->es_wait_event_set, -1,
+ occurred_event, EVENT_BUFFER_SIZE);
+ Assert(noccurred > 0);
+
+ /*
+ * Loop over the occurred events and make a list of nodes that need
+ * a callback. The waiting nodes should have registered their wait
+ * events with user_data pointing back to the node.
+ */
+ for (n = 0; n < noccurred; ++n)
+ {
+ WaitEvent *w = &occurred_event[n];
+ PlanState *ps = w->user_data;
+
+ callbacks[ncallbacks++] = ps;
+ }
+
+ /*
+ * Initially, this loop will call the node-type-specific function for
+ * each node for which an event occurred. If any of those nodes
+ * produce a result, its parent enters the set of nodes that are
+ * pending for a callback. In this way, when a result becomes
+ * available in a leaf of the plan tree, it can bubble upwards towards
+ * the root as far as necessary.
+ */
+ while (ncallbacks > 0)
+ {
+ int i,
+ j;
+
+ /* Loop over all callbacks. */
+ for (i = 0; i < ncallbacks; ++i)
+ {
+ /* Skip if NULL. */
+ if (callbacks[i] == NULL)
+ continue;
+
+ /*
+ * Remove any duplicates. O(n) may not seem good, but it
+ * should hopefully be OK as long as EVENT_BUFFER_SIZE is
+ * not too large.
+ */
+ for (j = i + 1; j < ncallbacks; ++j)
+ if (callbacks[i] == callbacks[j])
+ callbacks[j] = NULL;
+
+ /* Dispatch to node-type-specific code. */
+ ExecDispatchNode(callbacks[i]);
+
+ /*
+ * If there's now a tuple ready, we must dispatch to the
+ * parent node; otherwise, there's nothing more to do.
+ */
+ if (callbacks[i]->result_ready)
+ callbacks[i] = callbacks[i]->parent;
+ else
+ callbacks[i] = NULL;
+ }
+
+ /* Squeeze out NULLs. */
+ for (i = 0, j = 0; j < ncallbacks; ++j)
+ if (callbacks[j] != NULL)
+ callbacks[i++] = callbacks[j];
+ ncallbacks = i;
+ }
+ }
+}
+
+/*
+ * An executor node should call this function to signal that it needs to wait
+ * on one more or events that can be registered on a WaitEventSet. nevents
+ * should be the maximum number of events that it will wish to register.
+ * reinit should be true if the node can't reuse the WaitEventSet it most
+ * recently initialized, for example because it needs to drop a wait event
+ * from the set.
+ */
+void
+ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit)
+{
+ EState *estate = planstate->state;
+
+ Assert(nevents > 0); /* otherwise, use ExecAsyncDoesNotNeedWait */
+
+ /*
+ * If this node is not already present in the array of waiting nodes,
+ * then add it. If that array hasn't been allocated or is full, this may
+ * require (re)allocating it.
+ */
+ if (planstate->n_async_events == 0)
+ {
+ if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes)
+ {
+ int newmax;
+
+ if (estate->es_max_waiting_nodes == 0)
+ {
+ newmax = 16;
+ estate->es_waiting_nodes =
+ MemoryContextAlloc(estate->es_query_cxt, newmax);
+ }
+ else
+ {
+ newmax = estate->es_max_waiting_nodes * 2;
+ estate->es_waiting_nodes =
+ repalloc(estate->es_waiting_nodes,
+ newmax * sizeof(PlanState *));
+ }
+ estate->es_max_waiting_nodes = newmax;
+ }
+ estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate;
+ }
+
+ /* Adjust per-node and per-estate totals. */
+ estate->es_total_async_events -= planstate->n_async_events;
+ planstate->n_async_events = nevents;
+ estate->es_total_async_events += planstate->n_async_events;
+
+ /*
+ * If a WaitEventSet has already been created, we need to discard it and
+ * start again if the user passed reinit = true, or if the total number of
+ * required events exceeds the supported number.
+ */
+ if (estate->es_wait_event_set != NULL && (reinit ||
+ estate->es_total_async_events > estate->es_max_async_events))
+ {
+ FreeWaitEventSet(estate->es_wait_event_set);
+ estate->es_wait_event_set = NULL;
+ }
+}
+
+/*
+ * If an executor node no longer needs to wait, it should call this function
+ * to report that fact.
+ */
+void
+ExecAsyncDoesNotNeedWait(PlanState *planstate)
+{
+ int n;
+ EState *estate = planstate->state;
+
+ if (planstate->n_async_events <= 0)
+ return;
+
+ /*
+ * Remove the node from the list of waiting nodes. (Is a linear search
+ * going to be a problem here? I think probably not.)
+ */
+ for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+ {
+ if (estate->es_waiting_nodes[n] == planstate)
+ {
+ estate->es_waiting_nodes[n] =
+ estate->es_waiting_nodes[--estate->es_num_waiting_nodes];
+ break;
+ }
+ }
+
+ /* We should always find ourselves in the array. */
+ Assert(n < estate->es_num_waiting_nodes);
+
+ /* We no longer need any asynchronous events. */
+ estate->es_total_async_events -= planstate->n_async_events;
+ planstate->n_async_events = 0;
+
+ /*
+ * The next wait will need to rebuild the WaitEventSet, because whatever
+ * events we registered are gone now. It's probably OK that this code
+ * assumes we actually did register some events at one point, because we
+ * needed to wait at some point and we don't any more.
+ */
+ if (estate->es_wait_event_set != NULL)
+ {
+ FreeWaitEventSet(estate->es_wait_event_set);
+ estate->es_wait_event_set = NULL;
+ }
+}
+
+/*
+ * Give per-nodetype function a chance to register wait events.
+ */
+static void
+ExecAsyncConfigureWait(PlanState *planstate, bool reinit)
+{
+ switch (nodeTag(planstate))
+ {
+ /* XXX: Add calls to per-nodetype handlers here. */
+ default:
+ elog(ERROR, "unexpected node type: %d", nodeTag(planstate));
+ }
+}
*/
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
/* ----------------------------------------------------------------
- * ExecProcNode
+ * ExecDispatchNode
*
- * Execute the given node to return a(nother) tuple.
+ * Invoke the given node's dispatch function.
* ----------------------------------------------------------------
*/
-TupleTableSlot *
-ExecProcNode(PlanState *node)
+void
+ExecDispatchNode(PlanState *node)
{
- TupleTableSlot *result;
-
- CHECK_FOR_INTERRUPTS();
-
- /* mark any previous result as having been consumed */
- node->result_ready = false;
-
- if (node->chgParam != NULL) /* something changed */
- ExecReScan(node); /* let ReScan handle this */
-
if (node->instrument)
InstrStartNode(node->instrument);
default:
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
- result = NULL;
break;
}
- /* We don't support asynchronous execution yet. */
- Assert(node->result_ready);
+ if (node->instrument)
+ {
+ double nTuples = 0.0;
- /* Result should be a TupleTableSlot, unless it's NULL. */
- Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+ if (node->result_ready && node->result != NULL &&
+ IsA(node->result, TupleTableSlot))
+ nTuples = 1.0;
- result = (TupleTableSlot *) node->result;
+ InstrStopNode(node->instrument, nTuples);
+ }
+}
- if (node->instrument)
- InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
- return result;
+/* ----------------------------------------------------------------
+ * ExecExecuteNode
+ *
+ * Request the next tuple from the given node. Note that
+ * if the node supports asynchrony, result_ready may not be
+ * set on return (use ExecProcNode if you need that, or call
+ * ExecAsyncWaitForNode).
+ * ----------------------------------------------------------------
+ */
+void
+ExecExecuteNode(PlanState *node)
+{
+ node->result_ready = false;
+ ExecDispatchNode(node);
+}
+
+
+/* ----------------------------------------------------------------
+ * ExecProcNode
+ *
+ * Get the next tuple from the given node. If the node is
+ * asynchronous, wait for a tuple to be ready before
+ * returning.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecProcNode(PlanState *node)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ /* mark any previous result as having been consumed */
+ node->result_ready = false;
+
+ if (node->chgParam != NULL) /* something changed */
+ ExecReScan(node); /* let ReScan handle this */
+
+ ExecDispatchNode(node);
+
+ if (!node->result_ready)
+ ExecAsyncWaitForNode(node);
+
+ /* Result should be a TupleTableSlot, unless it's NULL. */
+ Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+
+ return (TupleTableSlot *) node->result;
}