Lightweight framework for waiting for events. async
authorRobert Haas <rhaas@postgresql.org>
Mon, 9 May 2016 15:48:11 +0000 (11:48 -0400)
committerRobert Haas <rhaas@postgresql.org>
Tue, 20 Sep 2016 16:39:57 +0000 (12:39 -0400)
src/backend/executor/Makefile
src/backend/executor/execAsync.c [new file with mode: 0644]
src/backend/executor/execProcnode.c
src/include/executor/execAsync.h [new file with mode: 0644]
src/include/executor/executor.h
src/include/nodes/execnodes.h

index 51edd4c5e709590d75fd9459f43b13d7eca2bad2..0675b0135670b52ee6f37ba35ba6e84a04747d62 100644 (file)
@@ -12,8 +12,8 @@ subdir = src/backend/executor
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
-       execMain.o execParallel.o execProcnode.o execQual.o \
+OBJS = execAmi.o execAsync.o execCurrent.o execGrouping.o execIndexing.o \
+       execJunk.o execMain.o execParallel.o execProcnode.o execQual.o \
        execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644 (file)
index 0000000..20601fa
--- /dev/null
@@ -0,0 +1,256 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *       Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * This file contains routines that are intended to asynchronous
+ * execution; that is, suspending an executor node until some external
+ * event occurs, or until one of its child nodes produces a tuple.
+ * This allows the executor to avoid blocking on a single external event,
+ * such as a file descriptor waiting on I/O, or a parallel worker which
+ * must complete work elsewhere in the plan tree, when there might at the
+ * same time be useful computation that could be accomplished in some
+ * other part of the plan tree.
+ *
+ * IDENTIFICATION
+ *       src/backend/executor/execParallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/executor.h"
+#include "storage/latch.h"
+
+#define        EVENT_BUFFER_SIZE               16
+
+static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit);
+
+void
+ExecAsyncWaitForNode(PlanState *planstate)
+{
+       WaitEvent       occurred_event[EVENT_BUFFER_SIZE];
+       PlanState  *callbacks[EVENT_BUFFER_SIZE];
+       int                     ncallbacks = 0;
+       EState *estate = planstate->state;
+
+       while (!planstate->result_ready)
+       {
+               bool    reinit = (estate->es_wait_event_set == NULL);
+               int             n;
+               int             noccurred;
+
+               if (reinit)
+               {
+                       /*
+                        * Allow for a few extra events without reinitializing.  It
+                        * doesn't seem worth the complexity of doing anything very
+                        * aggressive here, because plans that depend on massive numbers
+                        * of external FDs are likely to run afoul of kernel limits anyway.
+                        */
+                       estate->es_max_async_events = estate->es_total_async_events + 16;
+                       estate->es_wait_event_set =
+                               CreateWaitEventSet(estate->es_query_cxt,
+                                                                  estate->es_max_async_events);
+               }
+
+               /* Give each waiting node a chance to add or modify events. */
+               for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+                       ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit);
+
+               /* Wait for at least one event to occur. */
+               noccurred = WaitEventSetWait(estate->es_wait_event_set, -1,
+                                                                        occurred_event, EVENT_BUFFER_SIZE);
+               Assert(noccurred > 0);
+
+               /*
+                * Loop over the occurred events and make a list of nodes that need
+                * a callback.  The waiting nodes should have registered their wait
+                * events with user_data pointing back to the node.
+                */
+               for (n = 0; n < noccurred; ++n)
+               {
+                       WaitEvent  *w = &occurred_event[n];
+                       PlanState  *ps = w->user_data;
+
+                       callbacks[ncallbacks++] = ps;
+               }
+
+               /*
+                * Initially, this loop will call the node-type-specific function for
+                * each node for which an event occurred.  If any of those nodes
+                * produce a result, its parent enters the set of nodes that are
+                * pending for a callback.  In this way, when a result becomes
+                * available in a leaf of the plan tree, it can bubble upwards towards
+                * the root as far as necessary.
+                */
+               while (ncallbacks > 0)
+               {
+                       int             i,
+                                       j;
+
+                       /* Loop over all callbacks. */
+                       for (i = 0; i < ncallbacks; ++i)
+                       {
+                               /* Skip if NULL. */
+                               if (callbacks[i] == NULL)
+                                       continue;
+
+                               /*
+                                * Remove any duplicates.  O(n) may not seem good, but it
+                                * should hopefully be OK as long as EVENT_BUFFER_SIZE is
+                                * not too large.
+                                */
+                               for (j = i + 1; j < ncallbacks; ++j)
+                                       if (callbacks[i] == callbacks[j])
+                                               callbacks[j] = NULL;
+
+                               /* Dispatch to node-type-specific code. */
+                               ExecDispatchNode(callbacks[i]);
+
+                               /*
+                                * If there's now a tuple ready, we must dispatch to the
+                                * parent node; otherwise, there's nothing more to do.
+                                */
+                               if (callbacks[i]->result_ready)
+                                       callbacks[i] = callbacks[i]->parent;
+                               else
+                                       callbacks[i] = NULL;
+                       }
+
+                       /* Squeeze out NULLs. */
+                       for (i = 0, j = 0; j < ncallbacks; ++j)
+                               if (callbacks[j] != NULL)
+                                       callbacks[i++] = callbacks[j];
+                       ncallbacks = i;
+               }
+       }
+}
+
+/*
+ * An executor node should call this function to signal that it needs to wait
+ * on one more or events that can be registered on a WaitEventSet.  nevents
+ * should be the maximum number of events that it will wish to register.
+ * reinit should be true if the node can't reuse the WaitEventSet it most
+ * recently initialized, for example because it needs to drop a wait event
+ * from the set.
+ */
+void
+ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit)
+{
+       EState *estate = planstate->state;
+
+       Assert(nevents > 0);    /* otherwise, use ExecAsyncDoesNotNeedWait */
+
+       /*
+        * If this node is not already present in the array of waiting nodes,
+        * then add it.  If that array hasn't been allocated or is full, this may
+        * require (re)allocating it.
+        */
+       if (planstate->n_async_events == 0)
+       {
+               if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes)
+               {
+                       int             newmax;
+
+                       if (estate->es_max_waiting_nodes == 0)
+                       {
+                               newmax = 16;
+                               estate->es_waiting_nodes =
+                                       MemoryContextAlloc(estate->es_query_cxt, newmax);
+                       }
+                       else
+                       {
+                               newmax = estate->es_max_waiting_nodes * 2;
+                               estate->es_waiting_nodes =
+                                       repalloc(estate->es_waiting_nodes,
+                                                        newmax * sizeof(PlanState *));
+                       }
+                       estate->es_max_waiting_nodes = newmax;
+               }
+               estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate;
+       }
+
+       /* Adjust per-node and per-estate totals. */
+       estate->es_total_async_events -= planstate->n_async_events;
+       planstate->n_async_events = nevents;
+       estate->es_total_async_events += planstate->n_async_events;
+
+       /*
+        * If a WaitEventSet has already been created, we need to discard it and
+        * start again if the user passed reinit = true, or if the total number of
+        * required events exceeds the supported number.
+        */
+       if (estate->es_wait_event_set != NULL && (reinit ||
+               estate->es_total_async_events > estate->es_max_async_events))
+       {
+               FreeWaitEventSet(estate->es_wait_event_set);
+               estate->es_wait_event_set = NULL;
+       }
+}
+
+/*
+ * If an executor node no longer needs to wait, it should call this function
+ * to report that fact.
+ */
+void
+ExecAsyncDoesNotNeedWait(PlanState *planstate)
+{
+       int             n;
+       EState *estate = planstate->state;
+
+       if (planstate->n_async_events <= 0)
+               return;
+
+       /*
+        * Remove the node from the list of waiting nodes.  (Is a linear search
+        * going to be a problem here?  I think probably not.)
+        */
+       for (n = 0; n < estate->es_num_waiting_nodes; ++n)
+       {
+               if (estate->es_waiting_nodes[n] == planstate)
+               {
+                       estate->es_waiting_nodes[n] =
+                               estate->es_waiting_nodes[--estate->es_num_waiting_nodes];
+                       break;
+               }
+       }
+
+       /* We should always find ourselves in the array. */
+       Assert(n < estate->es_num_waiting_nodes);
+
+       /* We no longer need any asynchronous events. */
+       estate->es_total_async_events -= planstate->n_async_events;
+       planstate->n_async_events = 0;
+
+       /*
+        * The next wait will need to rebuild the WaitEventSet, because whatever
+        * events we registered are gone now.  It's probably OK that this code
+        * assumes we actually did register some events at one point, because we
+        * needed to wait at some point and we don't any more.
+        */
+       if (estate->es_wait_event_set != NULL)
+       {
+               FreeWaitEventSet(estate->es_wait_event_set);
+               estate->es_wait_event_set = NULL;
+       }
+}
+
+/*
+ * Give per-nodetype function a chance to register wait events.
+ */
+static void
+ExecAsyncConfigureWait(PlanState *planstate, bool reinit)
+{
+       switch (nodeTag(planstate))
+       {
+               /* XXX: Add calls to per-nodetype handlers here. */
+               default:
+                       elog(ERROR, "unexpected node type: %d", nodeTag(planstate));
+       }
+}
index 3f2ebff1733911cb1daa48fcd12ff2d8c9ae7682..b7ac08eead724efd80215c7556ffd3bd779868e8 100644 (file)
@@ -77,6 +77,7 @@
  */
 #include "postgres.h"
 
+#include "executor/execAsync.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
 #include "executor/nodeAppend.h"
@@ -368,24 +369,14 @@ ExecInitNode(Plan *node, EState *estate, PlanState *parent, int eflags)
 
 
 /* ----------------------------------------------------------------
- *             ExecProcNode
+ *             ExecDispatchNode
  *
- *             Execute the given node to return a(nother) tuple.
+ *             Invoke the given node's dispatch function.
  * ----------------------------------------------------------------
  */
-TupleTableSlot *
-ExecProcNode(PlanState *node)
+void
+ExecDispatchNode(PlanState *node)
 {
-       TupleTableSlot *result;
-
-       CHECK_FOR_INTERRUPTS();
-
-       /* mark any previous result as having been consumed */
-       node->result_ready = false;
-
-       if (node->chgParam != NULL) /* something changed */
-               ExecReScan(node);               /* let ReScan handle this */
-
        if (node->instrument)
                InstrStartNode(node->instrument);
 
@@ -539,22 +530,67 @@ ExecProcNode(PlanState *node)
 
                default:
                        elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node));
-                       result = NULL;
                        break;
        }
 
-       /* We don't support asynchronous execution yet. */
-       Assert(node->result_ready);
+       if (node->instrument)
+       {
+               double  nTuples = 0.0;
 
-       /* Result should be a TupleTableSlot, unless it's NULL. */
-       Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+               if (node->result_ready && node->result != NULL &&
+                       IsA(node->result, TupleTableSlot))
+                       nTuples = 1.0;
 
-       result = (TupleTableSlot *) node->result;
+               InstrStopNode(node->instrument, nTuples);
+       }
+}
 
-       if (node->instrument)
-               InstrStopNode(node->instrument, TupIsNull(result) ? 0.0 : 1.0);
 
-       return result;
+/* ----------------------------------------------------------------
+ *             ExecExecuteNode
+ *
+ *             Request the next tuple from the given node.  Note that
+ *             if the node supports asynchrony, result_ready may not be
+ *             set on return (use ExecProcNode if you need that, or call
+ *             ExecAsyncWaitForNode).
+ * ----------------------------------------------------------------
+ */
+void
+ExecExecuteNode(PlanState *node)
+{
+       node->result_ready = false;
+       ExecDispatchNode(node);
+}
+
+
+/* ----------------------------------------------------------------
+ *             ExecProcNode
+ *
+ *             Get the next tuple from the given node.  If the node is
+ *             asynchronous, wait for a tuple to be ready before
+ *             returning.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecProcNode(PlanState *node)
+{
+       CHECK_FOR_INTERRUPTS();
+
+       /* mark any previous result as having been consumed */
+       node->result_ready = false;
+
+       if (node->chgParam != NULL) /* something changed */
+               ExecReScan(node);               /* let ReScan handle this */
+
+       ExecDispatchNode(node);
+
+       if (!node->result_ready)
+               ExecAsyncWaitForNode(node);
+
+       /* Result should be a TupleTableSlot, unless it's NULL. */
+       Assert(node->result == NULL || IsA(node->result, TupleTableSlot));
+
+       return (TupleTableSlot *) node->result;
 }
 
 
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644 (file)
index 0000000..38b37a1
--- /dev/null
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.h
+ *             Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *             src/include/executor/execAsync.h
+ *--------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncWaitForNode(PlanState *planstate);
+extern void ExecAsyncNeedsWait(PlanState *planstate, int nevents,
+       bool reinit);
+extern void ExecAsyncDoesNotNeedWait(PlanState *planstate);
+
+#endif   /* EXECASYNC_H */
index 1eb09d80a0fdd94739d9cc62cf2940349d4397b9..7abc361fe5c0a734de69a77e70e10987b832c64a 100644 (file)
@@ -223,6 +223,8 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
  */
 extern PlanState *ExecInitNode(Plan *node, EState *estate, PlanState *parent,
                         int eflags);
+extern void ExecDispatchNode(PlanState *node);
+extern void ExecExecuteNode(PlanState *node);
 extern TupleTableSlot *ExecProcNode(PlanState *node);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
index b14aa7a3068f6304b1f0c0f7fdf11867ec74697c..d0c7cfc30046e10bd04770eed824d734af7002fd 100644 (file)
@@ -383,6 +383,14 @@ typedef struct EState
        ParamListInfo es_param_list_info;       /* values of external params */
        ParamExecData *es_param_exec_vals;      /* values of internal params */
 
+       /* Asynchronous execution support */
+       struct PlanState **es_waiting_nodes;            /* array of waiting nodes */
+       int                     es_num_waiting_nodes;   /* # of waiters in array */
+       int                     es_max_waiting_nodes;   /* # of allocated entries */
+       int                     es_total_async_events;  /* total of per-node n_async_events */
+       int                     es_max_async_events;    /* # supported by event set */
+       struct WaitEventSet *es_wait_event_set;
+
        /* Other working state: */
        MemoryContext es_query_cxt; /* per-query context in which EState lives */
 
@@ -1035,6 +1043,8 @@ typedef struct PlanState
        bool            result_ready;   /* true if result is ready */
        Node       *result;                     /* result, most often TupleTableSlot */
 
+       int                     n_async_events; /* # of async events we want to register */
+
        Instrumentation *instrument;    /* Optional runtime stats for this node */
        WorkerInstrumentation *worker_instrument;       /* per-worker instrumentation */