beginnings of true async attempt, but doesn't work async2
authorRobert Haas <rhaas@postgresql.org>
Fri, 23 Sep 2016 21:34:03 +0000 (17:34 -0400)
committerRobert Haas <rhaas@postgresql.org>
Fri, 23 Sep 2016 21:36:25 +0000 (17:36 -0400)
ERROR:  another command is already in progress

CONTEXT:  Remote SQL command: DECLARE c2 CURSOR FOR
SELECT a, b FROM t.foo2
STATEMENT:  select * from foo;

and if we got past that we can't configure a wait

and there are various other problems

contrib/postgres_fdw/connection.c
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h

index 8ca1c1c89869e52e8261b52ab5403ca5547b84fa..0a29c63105bc6d42f56ad78e1d49fdcfa3bd6952 100644 (file)
@@ -50,6 +50,13 @@ typedef struct ConnCacheEntry
        bool            have_error;             /* have any subxacts aborted in this xact? */
 } ConnCacheEntry;
 
+struct PgFdwAsyncQuery
+{
+       PGconn     *conn;
+       char       *query;
+       PGresult   *res;
+};
+
 /*
  * Connection cache (initialized on first use)
  */
@@ -520,6 +527,68 @@ pgfdw_get_result(PGconn *conn, const char *query)
        return last_res;
 }
 
+/*
+ * Begin an asynchronous query using a PGconn.
+ */
+PgFdwAsyncQuery *
+pgfdw_begin_query(PGconn *conn, const char *query)
+{
+       PgFdwAsyncQuery *aq;
+       MemoryContext   oldcontext;
+
+       /*
+        * XXX. This should be added to a list someplace or to the connection cache
+        * so that we don't leak it if an error happens.  But I haven't written
+        * code to do that yet, so right now we just leak.
+        *
+        * XXX.  Also, we need to track the fact that the connection is busy so
+        * that we don't end up with multiple scans trying to use the same
+        * connection at the same time.
+        */
+       oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+       aq = palloc(sizeof(PgFdwAsyncQuery));
+       aq->conn = conn;
+       aq->query = pstrdup(query);
+       aq->res = NULL;
+       MemoryContextSwitchTo(oldcontext);
+
+       /* See comments in pgfdw_exec_query. */
+       if (!PQsendQuery(conn, query))
+               pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+       /* Return handle to caller. */
+       return aq;
+}
+
+/*
+ * See whether an asynchronous query using a PGconn has finished.  Returns
+ * true if yes and false if no.  In the former case, *result gets the final
+ * result of the query.
+ */
+bool
+pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result)
+{
+       for (;;)
+       {
+               PGresult   *res;
+
+               if (!PQconsumeInput(aq->conn))
+                               pgfdw_report_error(ERROR, NULL, aq->conn, false, aq->query);
+               if (PQisBusy(aq->conn))
+                       return false;
+               res = PQgetResult(aq->conn);
+               if (res == NULL)
+               {
+                       res = aq->res;
+                       pfree(aq->query);
+                       pfree(aq);
+                       *result = res;
+                       return true;
+               }
+               aq->res = res;
+       }
+}
+
 /*
  * Report an error we got from the remote server.
  *
index ab69aa330cbb232100b3c0f9dd93edef1253501e..7dab97297c4bd35a8f517a55afdf00afd5be416d 100644 (file)
@@ -141,6 +141,7 @@ typedef struct PgFdwScanState
        FmgrInfo   *param_flinfo;       /* output conversion functions for them */
        List       *param_exprs;        /* executable expressions for param values */
        const char **param_values;      /* textual values of query parameters */
+       PgFdwAsyncQuery *aq;            /* async query */
 
        /* for storing result tuples */
        HeapTuple  *tuples;                     /* array of currently-retrieved tuples */
@@ -2891,6 +2892,47 @@ create_cursor(ForeignScanState *node)
        pfree(buf.data);
 }
 
+/*
+ * Convert result tuples to HeapTuples.
+ */
+static void
+convert_result_tuples(ForeignScanState *node, PGresult *res)
+{
+       PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+       int                     numrows;
+       int                     i;
+       MemoryContext oldcontext;
+
+       oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
+
+       numrows = PQntuples(res);
+       fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+       fsstate->num_tuples = numrows;
+       fsstate->next_tuple = 0;
+
+       for (i = 0; i < numrows; i++)
+       {
+               Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+               fsstate->tuples[i] =
+                       make_tuple_from_result_row(res, i,
+                                                                          fsstate->rel,
+                                                                          fsstate->attinmeta,
+                                                                          fsstate->retrieved_attrs,
+                                                                          node,
+                                                                          fsstate->temp_cxt);
+       }
+
+       /* Update fetch_ct_2 */
+       if (fsstate->fetch_ct_2 < 2)
+               fsstate->fetch_ct_2++;
+
+       /* Must be EOF if we didn't get as many tuples as we asked for. */
+       fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+       MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Fetch some more rows from the node's cursor.
  */
@@ -2899,7 +2941,6 @@ fetch_more_data(ForeignScanState *node)
 {
        PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
        PGresult   *volatile res = NULL;
-       MemoryContext oldcontext;
 
        /*
         * We'll store the tuples in the batch_cxt.  First, flush the previous
@@ -2907,15 +2948,12 @@ fetch_more_data(ForeignScanState *node)
         */
        fsstate->tuples = NULL;
        MemoryContextReset(fsstate->batch_cxt);
-       oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
        /* PGresult must be released before leaving this function. */
        PG_TRY();
        {
                PGconn     *conn = fsstate->conn;
                char            sql[64];
-               int                     numrows;
-               int                     i;
 
                snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
                                 fsstate->fetch_size, fsstate->cursor_number);
@@ -2925,31 +2963,7 @@ fetch_more_data(ForeignScanState *node)
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                        pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
-               /* Convert the data into HeapTuples */
-               numrows = PQntuples(res);
-               fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-               fsstate->num_tuples = numrows;
-               fsstate->next_tuple = 0;
-
-               for (i = 0; i < numrows; i++)
-               {
-                       Assert(IsA(node->ss.ps.plan, ForeignScan));
-
-                       fsstate->tuples[i] =
-                               make_tuple_from_result_row(res, i,
-                                                                                  fsstate->rel,
-                                                                                  fsstate->attinmeta,
-                                                                                  fsstate->retrieved_attrs,
-                                                                                  node,
-                                                                                  fsstate->temp_cxt);
-               }
-
-               /* Update fetch_ct_2 */
-               if (fsstate->fetch_ct_2 < 2)
-                       fsstate->fetch_ct_2++;
-
-               /* Must be EOF if we didn't get as many tuples as we asked for. */
-               fsstate->eof_reached = (numrows < fsstate->fetch_size);
+               convert_result_tuples(node, res);
 
                PQclear(res);
                res = NULL;
@@ -2961,8 +2975,38 @@ fetch_more_data(ForeignScanState *node)
                PG_RE_THROW();
        }
        PG_END_TRY();
+}
 
-       MemoryContextSwitchTo(oldcontext);
+static void
+begin_async_fetch(ForeignScanState *node)
+{
+       PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+       char            sql[64];
+
+       fsstate->tuples = NULL;
+       MemoryContextReset(fsstate->batch_cxt);
+
+       snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+                        fsstate->fetch_size, fsstate->cursor_number);
+
+       fsstate->aq = pgfdw_begin_query(fsstate->conn, sql);
+}
+
+static bool
+finish_async_fetch(ForeignScanState *node)
+{
+       PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+       PGresult *res;
+
+       if (!pgfdw_finish_query(fsstate->aq, &res))
+               return false;
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pgfdw_report_error(ERROR, res, fsstate->conn, true, fsstate->query);
+       /* XXX. Error in convert_result_tuples could leak. */
+       convert_result_tuples(node, res);
+       PQclear(res);
+       fsstate->aq = NULL;
+       return true;
 }
 
 /*
@@ -4360,21 +4404,57 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
 static bool
 postgresIsForeignPathAsyncCapable(ForeignPath *path)
 {
+       /* XXX. Prolly not for direct modify... */
        return true;
 }
 
-/*
- * XXX. Just for testing purposes, let's run everything through the async
- * mechanism but return tuples synchronously.
- */
 static void
 postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
 {
        ForeignScanState *node = (ForeignScanState *) areq->requestee;
+       PgFdwScanState *fsstate;
        TupleTableSlot *slot;
 
        Assert(IsA(node, ForeignScanState));
-       slot = postgresIterateForeignScan(node);
+       fsstate = (PgFdwScanState *) node->fdw_state;
+       Assert(fsstate->aq == NULL);
+       slot = node->ss.ss_ScanTupleSlot;
+
+       /*
+        * If this is the first call after Begin or ReScan, we need to create the
+        * cursor on the remote side.
+        */
+       if (!fsstate->cursor_exists)
+               create_cursor(node);
+
+       /*
+        * Get some more tuples, if we've run out.
+        */
+       if (fsstate->next_tuple >= fsstate->num_tuples)
+       {
+               /* No point in another fetch if we already detected EOF, though. */
+               if (!fsstate->eof_reached)
+               {
+                       begin_async_fetch(node);
+                       ExecAsyncSetRequiredEvents(estate, areq, 1, false, false);
+                       return;
+               }
+
+               /* If we didn't get any tuples, must be end of data. */
+               if (fsstate->next_tuple >= fsstate->num_tuples)
+               {
+                       ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot));
+                       return;
+               }
+       }
+
+       /*
+        * Return the next tuple.
+        */
+       ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+                                  slot,
+                                  InvalidBuffer,
+                                  false);
        ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
@@ -4388,7 +4468,28 @@ postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq,
 static void
 postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq)
 {
-       elog(ERROR, "postgresForeignAsyncNotify");
+       ForeignScanState *node = (ForeignScanState *) areq->requestee;
+       PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+       TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+
+       if (!finish_async_fetch(node))
+               return;
+
+       /* If we didn't get any tuples, must be end of data. */
+       if (fsstate->next_tuple >= fsstate->num_tuples)
+       {
+               ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot));
+               return;
+       }
+
+       /*
+        * Return the next tuple.
+        */
+       ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+                                  slot,
+                                  InvalidBuffer,
+                                  false);
+       ExecAsyncRequestDone(estate, areq, (Node *) slot);
 }
 
 /*
index 67126bc421f69c3af17fa8e7ba56b01e583610b7..9c193ff86bce21a32c9cf673d20713fbc16ca15b 100644 (file)
@@ -20,6 +20,9 @@
 
 #include "libpq-fe.h"
 
+struct PgFdwAsyncQuery;
+typedef struct PgFdwAsyncQuery PgFdwAsyncQuery;
+
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
  * foreign table.  This information is collected by postgresGetForeignRelSize.
@@ -107,6 +110,8 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
                                   bool clear, const char *sql);
+extern PgFdwAsyncQuery *pgfdw_begin_query(PGconn *conn, const char *query);
+extern bool pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result);
 
 /* in option.c */
 extern int ExtractConnectionOptions(List *defelems,