bool have_error; /* have any subxacts aborted in this xact? */
} ConnCacheEntry;
+struct PgFdwAsyncQuery
+{
+ PGconn *conn;
+ char *query;
+ PGresult *res;
+};
+
/*
* Connection cache (initialized on first use)
*/
return last_res;
}
+/*
+ * Begin an asynchronous query using a PGconn.
+ */
+PgFdwAsyncQuery *
+pgfdw_begin_query(PGconn *conn, const char *query)
+{
+ PgFdwAsyncQuery *aq;
+ MemoryContext oldcontext;
+
+ /*
+ * XXX. This should be added to a list someplace or to the connection cache
+ * so that we don't leak it if an error happens. But I haven't written
+ * code to do that yet, so right now we just leak.
+ *
+ * XXX. Also, we need to track the fact that the connection is busy so
+ * that we don't end up with multiple scans trying to use the same
+ * connection at the same time.
+ */
+ oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+ aq = palloc(sizeof(PgFdwAsyncQuery));
+ aq->conn = conn;
+ aq->query = pstrdup(query);
+ aq->res = NULL;
+ MemoryContextSwitchTo(oldcontext);
+
+ /* See comments in pgfdw_exec_query. */
+ if (!PQsendQuery(conn, query))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+ /* Return handle to caller. */
+ return aq;
+}
+
+/*
+ * See whether an asynchronous query using a PGconn has finished. Returns
+ * true if yes and false if no. In the former case, *result gets the final
+ * result of the query.
+ */
+bool
+pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result)
+{
+ for (;;)
+ {
+ PGresult *res;
+
+ if (!PQconsumeInput(aq->conn))
+ pgfdw_report_error(ERROR, NULL, aq->conn, false, aq->query);
+ if (PQisBusy(aq->conn))
+ return false;
+ res = PQgetResult(aq->conn);
+ if (res == NULL)
+ {
+ res = aq->res;
+ pfree(aq->query);
+ pfree(aq);
+ *result = res;
+ return true;
+ }
+ aq->res = res;
+ }
+}
+
/*
* Report an error we got from the remote server.
*
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
const char **param_values; /* textual values of query parameters */
+ PgFdwAsyncQuery *aq; /* async query */
/* for storing result tuples */
HeapTuple *tuples; /* array of currently-retrieved tuples */
pfree(buf.data);
}
+/*
+ * Convert result tuples to HeapTuples.
+ */
+static void
+convert_result_tuples(ForeignScanState *node, PGresult *res)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ int numrows;
+ int i;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
+
+ numrows = PQntuples(res);
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
+
+ for (i = 0; i < numrows; i++)
+ {
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+ fsstate->tuples[i] =
+ make_tuple_from_result_row(res, i,
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->retrieved_attrs,
+ node,
+ fsstate->temp_cxt);
+ }
+
+ /* Update fetch_ct_2 */
+ if (fsstate->fetch_ct_2 < 2)
+ fsstate->fetch_ct_2++;
+
+ /* Must be EOF if we didn't get as many tuples as we asked for. */
+ fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Fetch some more rows from the node's cursor.
*/
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
PGresult *volatile res = NULL;
- MemoryContext oldcontext;
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
*/
fsstate->tuples = NULL;
MemoryContextReset(fsstate->batch_cxt);
- oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
PGconn *conn = fsstate->conn;
char sql[64];
- int numrows;
- int i;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
- /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
-
- for (i = 0; i < numrows; i++)
- {
- Assert(IsA(node->ss.ps.plan, ForeignScan));
-
- fsstate->tuples[i] =
- make_tuple_from_result_row(res, i,
- fsstate->rel,
- fsstate->attinmeta,
- fsstate->retrieved_attrs,
- node,
- fsstate->temp_cxt);
- }
-
- /* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
- fsstate->fetch_ct_2++;
-
- /* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ convert_result_tuples(node, res);
PQclear(res);
res = NULL;
PG_RE_THROW();
}
PG_END_TRY();
+}
- MemoryContextSwitchTo(oldcontext);
+static void
+begin_async_fetch(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ char sql[64];
+
+ fsstate->tuples = NULL;
+ MemoryContextReset(fsstate->batch_cxt);
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ fsstate->aq = pgfdw_begin_query(fsstate->conn, sql);
+}
+
+static bool
+finish_async_fetch(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PGresult *res;
+
+ if (!pgfdw_finish_query(fsstate->aq, &res))
+ return false;
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, fsstate->conn, true, fsstate->query);
+ /* XXX. Error in convert_result_tuples could leak. */
+ convert_result_tuples(node, res);
+ PQclear(res);
+ fsstate->aq = NULL;
+ return true;
}
/*
static bool
postgresIsForeignPathAsyncCapable(ForeignPath *path)
{
+ /* XXX. Prolly not for direct modify... */
return true;
}
-/*
- * XXX. Just for testing purposes, let's run everything through the async
- * mechanism but return tuples synchronously.
- */
static void
postgresForeignAsyncRequest(EState *estate, PendingAsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate;
TupleTableSlot *slot;
Assert(IsA(node, ForeignScanState));
- slot = postgresIterateForeignScan(node);
+ fsstate = (PgFdwScanState *) node->fdw_state;
+ Assert(fsstate->aq == NULL);
+ slot = node->ss.ss_ScanTupleSlot;
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /*
+ * Get some more tuples, if we've run out.
+ */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though. */
+ if (!fsstate->eof_reached)
+ {
+ begin_async_fetch(node);
+ ExecAsyncSetRequiredEvents(estate, areq, 1, false, false);
+ return;
+ }
+
+ /* If we didn't get any tuples, must be end of data. */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot));
+ return;
+ }
+ }
+
+ /*
+ * Return the next tuple.
+ */
+ ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+ slot,
+ InvalidBuffer,
+ false);
ExecAsyncRequestDone(estate, areq, (Node *) slot);
}
static void
postgresForeignAsyncNotify(EState *estate, PendingAsyncRequest *areq)
{
- elog(ERROR, "postgresForeignAsyncNotify");
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+
+ if (!finish_async_fetch(node))
+ return;
+
+ /* If we didn't get any tuples, must be end of data. */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ ExecAsyncRequestDone(estate, areq, (Node *) ExecClearTuple(slot));
+ return;
+ }
+
+ /*
+ * Return the next tuple.
+ */
+ ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+ slot,
+ InvalidBuffer,
+ false);
+ ExecAsyncRequestDone(estate, areq, (Node *) slot);
}
/*