diff options
author | Robert Haas | 2016-09-23 21:34:03 +0000 |
---|---|---|
committer | Robert Haas | 2016-09-23 21:36:25 +0000 |
commit | 51ca0665e81a92b72b398b5ac1182c1231129ef4 (patch) | |
tree | 191be101a559ea50b1d95a8eacc54f1888aa7473 | |
parent | f07555cd8ebca78dec856c73a9518dfbbbdc631f (diff) |
beginnings of true async attempt, but doesn't workasync2
ERROR: another command is already in progress
CONTEXT: Remote SQL command: DECLARE c2 CURSOR FOR
SELECT a, b FROM t.foo2
STATEMENT: select * from foo;
and if we got past that we can't configure a wait
and there are various other problems
-rw-r--r-- | contrib/postgres_fdw/connection.c | 69 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 173 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.h | 5 |
3 files changed, 211 insertions, 36 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8ca1c1c898..0a29c63105 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -50,6 +50,13 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ } ConnCacheEntry; +struct PgFdwAsyncQuery +{ + PGconn *conn; + char *query; + PGresult *res; +}; + /* * Connection cache (initialized on first use) */ @@ -521,6 +528,68 @@ pgfdw_get_result(PGconn *conn, const char *query) } /* + * Begin an asynchronous query using a PGconn. + */ +PgFdwAsyncQuery * +pgfdw_begin_query(PGconn *conn, const char *query) +{ + PgFdwAsyncQuery *aq; + MemoryContext oldcontext; + + /* + * XXX. This should be added to a list someplace or to the connection cache + * so that we don't leak it if an error happens. But I haven't written + * code to do that yet, so right now we just leak. + * + * XXX. Also, we need to track the fact that the connection is busy so + * that we don't end up with multiple scans trying to use the same + * connection at the same time. + */ + oldcontext = MemoryContextSwitchTo(CacheMemoryContext); + aq = palloc(sizeof(PgFdwAsyncQuery)); + aq->conn = conn; + aq->query = pstrdup(query); + aq->res = NULL; + MemoryContextSwitchTo(oldcontext); + + /* See comments in pgfdw_exec_query. */ + if (!PQsendQuery(conn, query)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + + /* Return handle to caller. */ + return aq; +} + +/* + * See whether an asynchronous query using a PGconn has finished. Returns + * true if yes and false if no. In the former case, *result gets the final + * result of the query. + */ +bool +pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result) +{ + for (;;) + { + PGresult *res; + + if (!PQconsumeInput(aq->conn)) + pgfdw_report_error(ERROR, NULL, aq->conn, false, aq->query); + if (PQisBusy(aq->conn)) + return false; + res = PQgetResult(aq->conn); + if (res == NULL) + { + res = aq->res; + pfree(aq->query); + pfree(aq); + *result = res; + return true; + } + aq->res = res; + } +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index ab69aa330c..7dab97297c 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -141,6 +141,7 @@ typedef struct PgFdwScanState FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ + PgFdwAsyncQuery *aq; /* async query */ /* for storing result tuples */ HeapTuple *tuples; /* array of currently-retrieved tuples */ @@ -2892,6 +2893,47 @@ create_cursor(ForeignScanState *node) } /* + * Convert result tuples to HeapTuples. + */ +static void +convert_result_tuples(ForeignScanState *node, PGresult *res) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + int numrows; + int i; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); + + numrows = PQntuples(res); + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; + + for (i = 0; i < numrows; i++) + { + Assert(IsA(node->ss.ps.plan, ForeignScan)); + + fsstate->tuples[i] = + make_tuple_from_result_row(res, i, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + node, + fsstate->temp_cxt); + } + + /* Update fetch_ct_2 */ + if (fsstate->fetch_ct_2 < 2) + fsstate->fetch_ct_2++; + + /* Must be EOF if we didn't get as many tuples as we asked for. */ + fsstate->eof_reached = (numrows < fsstate->fetch_size); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Fetch some more rows from the node's cursor. */ static void @@ -2899,7 +2941,6 @@ fetch_more_data(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; - MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous @@ -2907,15 +2948,12 @@ fetch_more_data(ForeignScanState *node) */ fsstate->tuples = NULL; MemoryContextReset(fsstate->batch_cxt); - oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { PGconn *conn = fsstate->conn; char sql[64]; - int numrows; - int i; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fsstate->fetch_size, fsstate->cursor_number); @@ -2925,31 +2963,7 @@ fetch_more_data(ForeignScanState *node) if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); - /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; - - for (i = 0; i < numrows; i++) - { - Assert(IsA(node->ss.ps.plan, ForeignScan)); - - fsstate->tuples[i] = - make_tuple_from_result_row(res, i, - fsstate->rel, - fsstate->attinmeta, - fsstate->retrieved_attrs, - node, - fsstate->temp_cxt); - } - - /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) - fsstate->fetch_ct_2++; - - /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); + convert_result_tuples(node, res); PQclear(res); res = NULL; @@ -2961,8 +2975,38 @@ fetch_more_data(ForeignScanState *node) PG_RE_THROW(); } PG_END_TRY(); +} - MemoryContextSwitchTo(oldcontext); +static void +begin_async_fetch(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + fsstate->aq = pgfdw_begin_query(fsstate->conn, sql); +} + +static bool +finish_async_fetch(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGresult *res; + + if (!pgfdw_finish_query(fsstate->aq, &res)) + return false; + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, fsstate->conn, true, fsstate->query); + /* XXX. Error in convert_result_tuples could leak. */ + convert_result_tuples(node, res); + PQclear(res); + fsstate->aq = NULL; + return true; } /* @@ -4360,21 +4404,57 @@ postgresGetForeignJoinPaths(PlannerInfo *root, static bool postgresIsForeignPathAsyncCapable(ForeignPath *path) { + /* XXX. Prolly not for direct modify... */ return true; } -/* - * XXX. Just for testing purposes, let's run everything through the async - * mechanism but return tuples synchronously. - */ static void postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate; TupleTableSlot *slot; Assert(IsA(node, ForeignScanState)); - slot = postgresIterateForeignScan(node); + fsstate = (PgFdwScanState *) node->fdw_state; + Assert(fsstate->aq == NULL); + slot = node->ss.ss_ScanTupleSlot; + + /* + * If this is the first call after Begin or ReScan, we need to create the + * cursor on the remote side. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* + * Get some more tuples, if we've run out. + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though. */ + if (!fsstate->eof_reached) + { + begin_async_fetch(node); + ExecAsyncSetRequiredEvents(estate, areq, 1, false, false); + return; + } + + /* If we didn't get any tuples, must be end of data. */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot)); + return; + } + } + + /* + * Return the next tuple. + */ + ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], + slot, + InvalidBuffer, + false); ExecAsyncRequestDone(estate, areq, (Node *) slot); } @@ -4388,7 +4468,28 @@ postgresForeignAsyncConfigureWait(EState *estate, PendingAsyncRequest *areq, static void postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq) { - elog(ERROR, "postgresForeignAsyncNotify"); + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + if (!finish_async_fetch(node)) + return; + + /* If we didn't get any tuples, must be end of data. */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot)); + return; + } + + /* + * Return the next tuple. + */ + ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], + slot, + InvalidBuffer, + false); + ExecAsyncRequestDone(estate, areq, (Node *) slot); } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 67126bc421..9c193ff86b 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -20,6 +20,9 @@ #include "libpq-fe.h" +struct PgFdwAsyncQuery; +typedef struct PgFdwAsyncQuery PgFdwAsyncQuery; + /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * foreign table. This information is collected by postgresGetForeignRelSize. @@ -107,6 +110,8 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern PgFdwAsyncQuery *pgfdw_begin_query(PGconn *conn, const char *query); +extern bool pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, |