Fix EXPLAIN ANALYZE for async-capable nodes.
authorEtsuro Fujita <efujita@postgresql.org>
Wed, 12 May 2021 05:00:00 +0000 (14:00 +0900)
committerEtsuro Fujita <efujita@postgresql.org>
Wed, 12 May 2021 05:00:00 +0000 (14:00 +0900)
EXPLAIN ANALYZE for an async-capable ForeignScan node associated with
postgres_fdw is done just by using instrumentation for ExecProcNode()
called from the node's callbacks, causing the following problems:

1) If the remote table to scan is empty, the node is incorrectly
   considered as "never executed" by the command even if the node is
   executed, as ExecProcNode() isn't called from the node's callbacks at
   all in that case.
2) The command fails to collect timings for things other than
   ExecProcNode() done in the node, such as creating a cursor for the
   node's remote query.

To fix these problems, add instrumentation for async-capable nodes, and
modify postgres_fdw accordingly.

My oversight in commit 27e1f1456.

While at it, update a comment for the AsyncRequest struct in execnodes.h
and the documentation for the ForeignAsyncRequest API in fdwhandler.sgml
to match the code in ExecAsyncAppendResponse() in nodeAppend.c, and fix
typos in comments in nodeAppend.c.

Per report from Andrey Lepikhov, though I didn't use his patch.

Reviewed-by: Andrey Lepikhov
Discussion: https://postgr.es/m/2eb662bb-105d-fc20-7412-2f027cc3ca72%40postgrespro.ru

14 files changed:
contrib/auto_explain/auto_explain.c
contrib/pg_stat_statements/pg_stat_statements.c
contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/sql/postgres_fdw.sql
doc/src/sgml/fdwhandler.sgml
src/backend/executor/execAsync.c
src/backend/executor/execMain.c
src/backend/executor/execProcnode.c
src/backend/executor/instrument.c
src/backend/executor/nodeAppend.c
src/backend/executor/nodeForeignscan.c
src/include/executor/instrument.h
src/include/nodes/execnodes.h

index 445bb371912171fcf67553a05b7570e7b79a657f..e9092ba359ad88d4e6d819a22fb46e47a4636be5 100644 (file)
@@ -314,7 +314,7 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
            MemoryContext oldcxt;
 
            oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
-           queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
+           queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
            MemoryContextSwitchTo(oldcxt);
        }
    }
index f42f07622e942eda17e6f37f9867e9b33d2e55be..77ca5abcdc85d0416105c7f86c122dd7d34e9219 100644 (file)
@@ -974,7 +974,7 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
            MemoryContext oldcxt;
 
            oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
-           queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
+           queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false);
            MemoryContextSwitchTo(oldcxt);
        }
    }
index 6f533c745d69684047d175f2e3ac3e51ba6cf42a..0b0c45f0d9a189b5759991a52716c0097ed30fca 100644 (file)
@@ -10051,6 +10051,21 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
                Filter: (t1_3.b === 505)
 (14 rows)
 
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Limit (actual rows=1 loops=1)
+   ->  Append (actual rows=1 loops=1)
+         ->  Async Foreign Scan on async_p1 t1_1 (actual rows=0 loops=1)
+               Filter: (b === 505)
+         ->  Async Foreign Scan on async_p2 t1_2 (actual rows=0 loops=1)
+               Filter: (b === 505)
+         ->  Seq Scan on async_p3 t1_3 (actual rows=1 loops=1)
+               Filter: (b === 505)
+               Rows Removed by Filter: 101
+(9 rows)
+
 SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
   a   |  b  |  c   
 ------+-----+------
@@ -10132,18 +10147,32 @@ SELECT * FROM join_tbl ORDER BY a1;
 (3 rows)
 
 DELETE FROM join_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
 RESET enable_mergejoin;
 RESET enable_hashjoin;
+-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
+DELETE FROM async_p1;
+DELETE FROM async_p2;
+DELETE FROM async_p3;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt;
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Append (actual rows=0 loops=1)
+   ->  Async Foreign Scan on async_p1 async_pt_1 (actual rows=0 loops=1)
+   ->  Async Foreign Scan on async_p2 async_pt_2 (actual rows=0 loops=1)
+   ->  Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1)
+(4 rows)
+
 -- Clean up
 DROP TABLE async_pt;
 DROP TABLE base_tbl1;
 DROP TABLE base_tbl2;
 DROP TABLE result_tbl;
-DROP TABLE local_tbl;
-DROP FOREIGN TABLE remote_tbl;
-DROP FOREIGN TABLE insert_tbl;
-DROP TABLE base_tbl3;
-DROP TABLE base_tbl4;
 DROP TABLE join_tbl;
 ALTER SERVER loopback OPTIONS (DROP async_capable);
 ALTER SERVER loopback2 OPTIONS (DROP async_capable);
index 4ff58d9c2756ce7a22b188e2aacd4b7aefe52ccb..ee93ee07cc47300cb049e0b134fd7f5677f1093f 100644 (file)
@@ -1542,7 +1542,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
                             &fsstate->param_values);
 
    /* Set the async-capable flag */
-   fsstate->async_capable = node->ss.ps.plan->async_capable;
+   fsstate->async_capable = node->ss.ps.async_capable;
 }
 
 /*
@@ -6867,7 +6867,7 @@ produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
    }
 
    /* Get a tuple from the ForeignScan node */
-   result = ExecProcNode((PlanState *) node);
+   result = areq->requestee->ExecProcNodeReal(areq->requestee);
    if (!TupIsNull(result))
    {
        /* Mark the request as complete */
@@ -6956,6 +6956,11 @@ process_pending_request(AsyncRequest *areq)
    /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
    ExecAsyncResponse(areq);
 
+   /* Also, we do instrumentation ourselves, if required */
+   if (areq->requestee->instrument)
+       InstrUpdateTupleCount(areq->requestee->instrument,
+                             TupIsNull(areq->result) ? 0.0 : 1.0);
+
    MemoryContextSwitchTo(oldcontext);
 }
 
index 000e2534fc8ab5c183d2f235aaf71619ca72a3ca..53adfe2abc8954d04a79cbac6c184b28267e9f9d 100644 (file)
@@ -3195,6 +3195,8 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
 
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
 SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
 
 -- Check with foreign modify
@@ -3226,19 +3228,28 @@ INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND
 SELECT * FROM join_tbl ORDER BY a1;
 DELETE FROM join_tbl;
 
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+
 RESET enable_mergejoin;
 RESET enable_hashjoin;
 
+-- Check EXPLAIN ANALYZE for a query that scans empty partitions asynchronously
+DELETE FROM async_p1;
+DELETE FROM async_p2;
+DELETE FROM async_p3;
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM async_pt;
+
 -- Clean up
 DROP TABLE async_pt;
 DROP TABLE base_tbl1;
 DROP TABLE base_tbl2;
 DROP TABLE result_tbl;
-DROP TABLE local_tbl;
-DROP FOREIGN TABLE remote_tbl;
-DROP FOREIGN TABLE insert_tbl;
-DROP TABLE base_tbl3;
-DROP TABLE base_tbl4;
 DROP TABLE join_tbl;
 
 ALTER SERVER loopback OPTIONS (DROP async_capable);
index 8aa7edfe4af103a40c327a1a2ece76efe09b2b52..d1194def8200ddb172f78e057cedc64e1cad0197 100644 (file)
@@ -1597,7 +1597,7 @@ ForeignAsyncRequest(AsyncRequest *areq);
      <literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
      for the <structname>ForeignScan</structname> node to get a callback from
      the callback functions described below.  If no more tuples are available,
-     set the slot to NULL, and the
+     set the slot to NULL or an empty slot, and the
      <literal>areq-&gt;request_complete</literal> flag to
      <literal>true</literal>.  It's recommended to use
      <function>ExecAsyncRequestDone</function> or
index f1985e658c4b74be446037174a6a688787df87ed..75108d36be20e6c5e24abda414cca812c4dca399 100644 (file)
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "executor/execAsync.h"
+#include "executor/executor.h"
 #include "executor/nodeAppend.h"
 #include "executor/nodeForeignscan.h"
 
 void
 ExecAsyncRequest(AsyncRequest *areq)
 {
+   if (areq->requestee->chgParam != NULL)  /* something changed? */
+       ExecReScan(areq->requestee);        /* let ReScan handle this */
+
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStartNode(areq->requestee->instrument);
+
    switch (nodeTag(areq->requestee))
    {
        case T_ForeignScanState:
@@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq)
    }
 
    ExecAsyncResponse(areq);
+
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStopNode(areq->requestee->instrument,
+                     TupIsNull(areq->result) ? 0.0 : 1.0);
 }
 
 /*
@@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq)
 void
 ExecAsyncConfigureWait(AsyncRequest *areq)
 {
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStartNode(areq->requestee->instrument);
+
    switch (nodeTag(areq->requestee))
    {
        case T_ForeignScanState:
@@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
            elog(ERROR, "unrecognized node type: %d",
                 (int) nodeTag(areq->requestee));
    }
+
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStopNode(areq->requestee->instrument, 0.0);
 }
 
 /*
@@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq)
 void
 ExecAsyncNotify(AsyncRequest *areq)
 {
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStartNode(areq->requestee->instrument);
+
    switch (nodeTag(areq->requestee))
    {
        case T_ForeignScanState:
@@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq)
    }
 
    ExecAsyncResponse(areq);
+
+   /* must provide our own instrumentation support */
+   if (areq->requestee->instrument)
+       InstrStopNode(areq->requestee->instrument,
+                     TupIsNull(areq->result) ? 0.0 : 1.0);
 }
 
 /*
index df3d7f9a8bced95f20dc10c045cbefe862ea9881..58b496873507be3394167d365011b7c8ed6101e9 100644 (file)
@@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
        resultRelInfo->ri_TrigWhenExprs = (ExprState **)
            palloc0(n * sizeof(ExprState *));
        if (instrument_options)
-           resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options);
+           resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false);
    }
    else
    {
index 9f8c7582e04eb60527fac4d0d6270acfb2702bd0..753f46863b723a8e0701adf2b1a01d51e9dcb48a 100644 (file)
@@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 
    /* Set up instrumentation for this node if requested */
    if (estate->es_instrument)
-       result->instrument = InstrAlloc(1, estate->es_instrument);
+       result->instrument = InstrAlloc(1, estate->es_instrument,
+                                       result->async_capable);
 
    return result;
 }
index 237e13361b5d08a682fb80d87e61713d46903975..2b106d8473ce557b23bca5892581d1253472e17d 100644 (file)
@@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add);
 
 /* Allocate new instrumentation structure(s) */
 Instrumentation *
-InstrAlloc(int n, int instrument_options)
+InstrAlloc(int n, int instrument_options, bool async_mode)
 {
    Instrumentation *instr;
 
@@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options)
            instr[i].need_bufusage = need_buffers;
            instr[i].need_walusage = need_wal;
            instr[i].need_timer = need_timer;
+           instr[i].async_mode = async_mode;
        }
    }
 
@@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr)
 void
 InstrStopNode(Instrumentation *instr, double nTuples)
 {
+   double      save_tuplecount = instr->tuplecount;
    instr_time  endtime;
 
    /* count the returned tuples */
@@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples)
        instr->running = true;
        instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
    }
+   else
+   {
+       /*
+        * In async mode, if the plan node hadn't emitted any tuples before,
+        * this might be the first tuple
+        */
+       if (instr->async_mode && save_tuplecount < 1.0)
+           instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter);
+   }
+}
+
+/* Update tuple count */
+void
+InstrUpdateTupleCount(Instrumentation *instr, double nTuples)
+{
+   /* count the returned tuples */
+   instr->tuplecount += nTuples;
 }
 
 /* Finish a run cycle for a plan node */
index 3c1f12adafb542a99d2eb47b9e65e95e467036e3..1558fafad1e5ea9bb4660b9203f6ed89041c6eaa 100644 (file)
@@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate)
        }
 
        /*
-        * wait or poll async events if any. We do this before checking for
-        * the end of iteration, because it might drain the remaining async
-        * subplans.
+        * wait or poll for async events if any. We do this before checking
+        * for the end of iteration, because it might drain the remaining
+        * async subplans.
         */
        if (node->as_nasyncremain > 0)
            ExecAppendAsyncEventWait(node);
@@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node)
 
        /*
         * If chgParam of subnode is not null then plan will be re-scanned by
-        * first ExecProcNode.
+        * first ExecProcNode or by first ExecAsyncRequest.
         */
        if (subnode->chgParam == NULL)
            ExecReScan(subnode);
@@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
    {
        CHECK_FOR_INTERRUPTS();
 
-       /* Wait or poll async events. */
+       /* Wait or poll for async events. */
        ExecAppendAsyncEventWait(node);
 
        /* Request a tuple asynchronously. */
@@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq)
    /* Nothing to do if the request is pending. */
    if (!areq->request_complete)
    {
-       /* The request would have been pending for a callback */
+       /* The request would have been pending for a callback. */
        Assert(areq->callback_pending);
        return;
    }
index 898890fb08faf595d65eae46d6c8178998c24f36..9dc38d47ea78a7e75b4ede55ff9a957c0fe6b86b 100644 (file)
@@ -209,6 +209,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
    scanstate->fdw_recheck_quals =
        ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
 
+   /*
+    * Determine whether to scan the foreign relation asynchronously or not;
+    * this has to be kept in sync with the code in ExecInitAppend().
+    */
+   scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
+                                     estate->es_epq_active == NULL);
+
    /*
     * Initialize FDW-related state.
     */
index c25aa1b04c2c27da6fd7303fbedd36764ae061c8..fc87eed4fb237ec259eebccf5e066a54d2e7a03a 100644 (file)
@@ -55,6 +55,7 @@ typedef struct Instrumentation
    bool        need_timer;     /* true if we need timer data */
    bool        need_bufusage;  /* true if we need buffer usage data */
    bool        need_walusage;  /* true if we need WAL usage data */
+   bool        async_mode;     /* true if node is in async mode */
    /* Info about current plan cycle: */
    bool        running;        /* true if we've completed first tuple */
    instr_time  starttime;      /* start time of current iteration of node */
@@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
 extern PGDLLIMPORT WalUsage pgWalUsage;
 
-extern Instrumentation *InstrAlloc(int n, int instrument_options);
+extern Instrumentation *InstrAlloc(int n, int instrument_options,
+                                  bool async_mode);
 extern void InstrInit(Instrumentation *instr, int instrument_options);
 extern void InstrStartNode(Instrumentation *instr);
 extern void InstrStopNode(Instrumentation *instr, double nTuples);
+extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples);
 extern void InstrEndLoop(Instrumentation *instr);
 extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
 extern void InstrStartParallelQuery(void);
index e7ae21c023c95e29526f59597774b43f44e411b2..91a1c3a780ee086334d93632fa96a97719800a70 100644 (file)
@@ -538,7 +538,8 @@ typedef struct AsyncRequest
    int         request_index;  /* Scratch space for requestor */
    bool        callback_pending;   /* Callback is needed */
    bool        request_complete;   /* Request complete, result valid */
-   TupleTableSlot *result;     /* Result (NULL if no more tuples) */
+   TupleTableSlot *result;     /* Result (NULL or an empty slot if no more
+                                * tuples) */
 } AsyncRequest;
 
 /* ----------------
@@ -1003,6 +1004,8 @@ typedef struct PlanState
    ExprContext *ps_ExprContext;    /* node's expression-evaluation context */
    ProjectionInfo *ps_ProjInfo;    /* info for doing tuple projection */
 
+   bool        async_capable;  /* true if node is async-capable */
+
    /*
     * Scanslot's descriptor if known. This is a bit of a hack, but otherwise
     * it's hard for expression compilation to optimize based on the