static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static bool command_no_begin(const char *query);
static bool is_select_command(const char *query);
-static int SendQueryAndProcessResults(const char *query, double *pelapsed_msec, bool is_watch);
/*
* Returns true for valid result, false for error state.
*/
static bool
-AcceptResult(const PGresult *result, bool show_error)
+AcceptResult(const PGresult *result)
{
bool OK;
break;
}
- if (!OK && show_error)
+ if (!OK)
{
const char *error = PQerrorMessage(pset.db);
}
}
-/*
- * Consume all results
- */
-static void
-ClearOrSaveAllResults()
-{
- PGresult *result;
-
- while ((result = PQgetResult(pset.db)) != NULL)
- ClearOrSaveResult(result);
-}
-
/*
* Print microtiming output. Always print raw milliseconds; if the interval
ResetCancelConn();
- if (!AcceptResult(res, true))
+ if (!AcceptResult(res))
{
ClearOrSaveResult(res);
res = NULL;
int
PSQLexecWatch(const char *query, const printQueryOpt *opt)
{
+ PGresult *res;
double elapsed_msec = 0;
- int res;
+ instr_time before;
+ instr_time after;
if (!pset.db)
{
}
SetCancelConn(pset.db);
- res = SendQueryAndProcessResults(query, &elapsed_msec, true);
+
+ if (pset.timing)
+ INSTR_TIME_SET_CURRENT(before);
+
+ res = PQexec(pset.db, query);
+
ResetCancelConn();
+ if (!AcceptResult(res))
+ {
+ ClearOrSaveResult(res);
+ return 0;
+ }
+
+ if (pset.timing)
+ {
+ INSTR_TIME_SET_CURRENT(after);
+ INSTR_TIME_SUBTRACT(after, before);
+ elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
+ }
+
+ /*
+ * If SIGINT is sent while the query is processing, the interrupt will be
+ * consumed. The user's intention, though, is to cancel the entire watch
+ * process, so detect a sent cancellation request and exit in this case.
+ */
+ if (cancel_pressed)
+ {
+ PQclear(res);
+ return 0;
+ }
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_TUPLES_OK:
+ printQuery(res, opt, pset.queryFout, false, pset.logfile);
+ break;
+
+ case PGRES_COMMAND_OK:
+ fprintf(pset.queryFout, "%s\n%s\n\n", opt->title, PQcmdStatus(res));
+ break;
+
+ case PGRES_EMPTY_QUERY:
+ pg_log_error("\\watch cannot be used with an empty query");
+ PQclear(res);
+ return -1;
+
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ case PGRES_COPY_BOTH:
+ pg_log_error("\\watch cannot be used with COPY");
+ PQclear(res);
+ return -1;
+
+ default:
+ pg_log_error("unexpected result status for \\watch");
+ PQclear(res);
+ return -1;
+ }
+
+ PQclear(res);
+
fflush(pset.queryFout);
/* Possible microtiming output */
if (pset.timing)
PrintTiming(elapsed_msec);
- return res;
+ return 1;
}
/*
- * Marshal the COPY data. Either subroutine will get the
- * connection out of its COPY state, then call PQresultStatus()
- * once and report any error. Return whether all was ok.
+ * ProcessResult: utility function for use by SendQuery() only
+ *
+ * When our command string contained a COPY FROM STDIN or COPY TO STDOUT,
+ * PQexec() has stopped at the PGresult associated with the first such
+ * command. In that event, we'll marshal data for the COPY and then cycle
+ * through any subsequent PGresult objects.
*
- * For COPY OUT, direct the output to pset.copyStream if it's set,
- * otherwise to pset.gfname if it's set, otherwise to queryFout.
- * For COPY IN, use pset.copyStream as data source if it's set,
- * otherwise cur_cmd_source.
+ * When the command string contained no such COPY command, this function
+ * degenerates to an AcceptResult() call.
*
- * Update result if further processing is necessary, or NULL otherwise.
- * Return a result when queryFout can safely output a result status:
- * on COPY IN, or on COPY OUT if written to something other than pset.queryFout.
- * Returning NULL prevents the command status from being printed, which
- * we want if the status line doesn't get taken as part of the COPY data.
+ * Changes its argument to point to the last PGresult of the command string,
+ * or NULL if that result was for a COPY TO STDOUT. (Returning NULL prevents
+ * the command status from being printed, which we want in that case so that
+ * the status line doesn't get taken as part of the COPY data.)
+ *
+ * Returns true on complete success, false otherwise. Possible failure modes
+ * include purely client-side problems; check the transaction status for the
+ * server-side opinion.
*/
static bool
-HandleCopyResult(PGresult **result)
+ProcessResult(PGresult **results)
{
- bool success;
- FILE *copystream;
- PGresult *copy_result;
- ExecStatusType result_status = PQresultStatus(*result);
-
- Assert(result_status == PGRES_COPY_OUT ||
- result_status == PGRES_COPY_IN);
-
- SetCancelConn(pset.db);
+ bool success = true;
+ bool first_cycle = true;
- if (result_status == PGRES_COPY_OUT)
+ for (;;)
{
- bool need_close = false;
- bool is_pipe = false;
+ ExecStatusType result_status;
+ bool is_copy;
+ PGresult *next_result;
- if (pset.copyStream)
- {
- /* invoked by \copy */
- copystream = pset.copyStream;
- }
- else if (pset.gfname)
+ if (!AcceptResult(*results))
{
- /* invoked by \g */
- if (openQueryOutputFile(pset.gfname,
- ©stream, &is_pipe))
- {
- need_close = true;
- if (is_pipe)
- disable_sigpipe_trap();
- }
- else
- copystream = NULL; /* discard COPY data entirely */
+ /*
+ * Failure at this point is always a server-side failure or a
+ * failure to submit the command string. Either way, we're
+ * finished with this command string.
+ */
+ success = false;
+ break;
}
- else
+
+ result_status = PQresultStatus(*results);
+ switch (result_status)
{
- /* fall back to the generic query output stream */
- copystream = pset.queryFout;
- }
+ case PGRES_EMPTY_QUERY:
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ is_copy = false;
+ break;
- success = handleCopyOut(pset.db,
- copystream,
- ©_result)
- && (copystream != NULL);
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ is_copy = true;
+ break;
- /*
- * Suppress status printing if the report would go to the same
- * place as the COPY data just went. Note this doesn't
- * prevent error reporting, since handleCopyOut did that.
- */
- if (copystream == pset.queryFout)
- {
- PQclear(copy_result);
- copy_result = NULL;
+ default:
+ /* AcceptResult() should have caught anything else. */
+ is_copy = false;
+ pg_log_error("unexpected PQresultStatus: %d", result_status);
+ break;
}
- if (need_close)
+ if (is_copy)
{
- /* close \g argument file/pipe */
- if (is_pipe)
+ /*
+ * Marshal the COPY data. Either subroutine will get the
+ * connection out of its COPY state, then call PQresultStatus()
+ * once and report any error.
+ *
+ * For COPY OUT, direct the output to pset.copyStream if it's set,
+ * otherwise to pset.gfname if it's set, otherwise to queryFout.
+ * For COPY IN, use pset.copyStream as data source if it's set,
+ * otherwise cur_cmd_source.
+ */
+ FILE *copystream;
+ PGresult *copy_result;
+
+ SetCancelConn(pset.db);
+ if (result_status == PGRES_COPY_OUT)
{
- pclose(copystream);
- restore_sigpipe_trap();
+ bool need_close = false;
+ bool is_pipe = false;
+
+ if (pset.copyStream)
+ {
+ /* invoked by \copy */
+ copystream = pset.copyStream;
+ }
+ else if (pset.gfname)
+ {
+ /* invoked by \g */
+ if (openQueryOutputFile(pset.gfname,
+ ©stream, &is_pipe))
+ {
+ need_close = true;
+ if (is_pipe)
+ disable_sigpipe_trap();
+ }
+ else
+ copystream = NULL; /* discard COPY data entirely */
+ }
+ else
+ {
+ /* fall back to the generic query output stream */
+ copystream = pset.queryFout;
+ }
+
+ success = handleCopyOut(pset.db,
+ copystream,
+ ©_result)
+ && success
+ && (copystream != NULL);
+
+ /*
+ * Suppress status printing if the report would go to the same
+ * place as the COPY data just went. Note this doesn't
+ * prevent error reporting, since handleCopyOut did that.
+ */
+ if (copystream == pset.queryFout)
+ {
+ PQclear(copy_result);
+ copy_result = NULL;
+ }
+
+ if (need_close)
+ {
+ /* close \g argument file/pipe */
+ if (is_pipe)
+ {
+ pclose(copystream);
+ restore_sigpipe_trap();
+ }
+ else
+ {
+ fclose(copystream);
+ }
+ }
}
else
{
- fclose(copystream);
+ /* COPY IN */
+ copystream = pset.copyStream ? pset.copyStream : pset.cur_cmd_source;
+ success = handleCopyIn(pset.db,
+ copystream,
+ PQbinaryTuples(*results),
+ ©_result) && success;
}
+ ResetCancelConn();
+
+ /*
+ * Replace the PGRES_COPY_OUT/IN result with COPY command's exit
+ * status, or with NULL if we want to suppress printing anything.
+ */
+ PQclear(*results);
+ *results = copy_result;
}
- }
- else
- {
- /* COPY IN */
- copystream = pset.copyStream ? pset.copyStream : pset.cur_cmd_source;
- success = handleCopyIn(pset.db,
- copystream,
- PQbinaryTuples(*result),
- ©_result);
+ else if (first_cycle)
+ {
+ /* fast path: no COPY commands; PQexec visited all results */
+ break;
+ }
+
+ /*
+ * Check PQgetResult() again. In the typical case of a single-command
+ * string, it will return NULL. Otherwise, we'll have other results
+ * to process that may include other COPYs. We keep the last result.
+ */
+ next_result = PQgetResult(pset.db);
+ if (!next_result)
+ break;
+
+ PQclear(*results);
+ *results = next_result;
+ first_cycle = false;
}
- ResetCancelConn();
- PQclear(*result);
- *result = copy_result;
+ SetResultVariables(*results, success);
+
+ /* may need this to recover from conn loss during COPY */
+ if (!first_cycle && !CheckConnection())
+ return false;
return success;
}
+
/*
* PrintQueryStatus: report command status as required
*
- * Note: Utility function for use by HandleQueryResult() only.
+ * Note: Utility function for use by PrintQueryResults() only.
*/
static void
PrintQueryStatus(PGresult *results)
/*
- * HandleQueryResult: print out, store or execute one query result
- * as required.
+ * PrintQueryResults: print out (or store or execute) query results as required
+ *
+ * Note: Utility function for use by SendQuery() only.
*
* Returns true if the query executed successfully, false otherwise.
*/
static bool
-HandleQueryResult(PGresult *result, bool last)
+PrintQueryResults(PGresult *results)
{
bool success;
const char *cmdstatus;
- if (result == NULL)
+ if (!results)
return false;
- switch (PQresultStatus(result))
+ switch (PQresultStatus(results))
{
case PGRES_TUPLES_OK:
/* store or execute or print the data ... */
- if (last && pset.gset_prefix)
- success = StoreQueryTuple(result);
- else if (last && pset.gexec_flag)
- success = ExecQueryTuples(result);
- else if (last && pset.crosstab_flag)
- success = PrintResultsInCrosstab(result);
- else if (last || pset.show_all_results)
- success = PrintQueryTuples(result);
+ if (pset.gset_prefix)
+ success = StoreQueryTuple(results);
+ else if (pset.gexec_flag)
+ success = ExecQueryTuples(results);
+ else if (pset.crosstab_flag)
+ success = PrintResultsInCrosstab(results);
else
- success = true;
-
+ success = PrintQueryTuples(results);
/* if it's INSERT/UPDATE/DELETE RETURNING, also print status */
- if (last || pset.show_all_results)
- {
- cmdstatus = PQcmdStatus(result);
- if (strncmp(cmdstatus, "INSERT", 6) == 0 ||
- strncmp(cmdstatus, "UPDATE", 6) == 0 ||
- strncmp(cmdstatus, "DELETE", 6) == 0)
- PrintQueryStatus(result);
- }
-
+ cmdstatus = PQcmdStatus(results);
+ if (strncmp(cmdstatus, "INSERT", 6) == 0 ||
+ strncmp(cmdstatus, "UPDATE", 6) == 0 ||
+ strncmp(cmdstatus, "DELETE", 6) == 0)
+ PrintQueryStatus(results);
break;
case PGRES_COMMAND_OK:
- if (last || pset.show_all_results)
- PrintQueryStatus(result);
+ PrintQueryStatus(results);
success = true;
break;
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
- /* nothing to do here: already processed */
+ /* nothing to do here */
success = true;
break;
default:
success = false;
pg_log_error("unexpected PQresultStatus: %d",
- PQresultStatus(result));
+ PQresultStatus(results));
break;
}
return success;
}
-/*
- * Data structure and functions to record notices while they are
- * emitted, so that they can be shown later.
- *
- * We need to know which result is last, which requires to extract
- * one result in advance, hence two buffers are needed.
- */
-typedef struct {
- bool in_flip;
- PQExpBufferData flip;
- PQExpBufferData flop;
-} t_notice_messages;
-
-/*
- * Store notices in appropriate buffer, for later display.
- */
-static void
-AppendNoticeMessage(void *arg, const char *msg)
-{
- t_notice_messages *notes = (t_notice_messages*) arg;
- appendPQExpBufferStr(notes->in_flip ? ¬es->flip : ¬es->flop, msg);
-}
-
-/*
- * Show notices stored in buffer, which is then reset.
- */
-static void
-ShowNoticeMessage(t_notice_messages *notes)
-{
- PQExpBufferData *current = notes->in_flip ? ¬es->flip : ¬es->flop;
- if (current->data != NULL && *current->data != '\0')
- pg_log_info("%s", current->data);
- resetPQExpBuffer(current);
-}
-
-/*
- * SendQueryAndProcessResults: utility function for use by SendQuery()
- * and PSQLexecWatch().
- *
- * Sends query and cycles through PGresult objects.
- *
- * When not under \watch and if our command string contained a COPY FROM STDIN
- * or COPY TO STDOUT, the PGresult associated with these commands must be
- * processed by providing an input or output stream. In that event, we'll
- * marshal data for the COPY.
- *
- * For other commands, the results are processed normally, depending on their
- * status.
- *
- * Returns 1 on complete success, 0 on interrupt and -1 or errors. Possible
- * failure modes include purely client-side problems; check the transaction
- * status for the server-side opinion.
- *
- * Note that on a combined query, failure does not mean that nothing was
- * committed.
- */
-static int
-SendQueryAndProcessResults(const char *query, double *pelapsed_msec, bool is_watch)
-{
- bool success;
- instr_time before;
- PGresult *result;
- t_notice_messages notes;
-
- if (pset.timing)
- INSTR_TIME_SET_CURRENT(before);
-
- success = PQsendQuery(pset.db, query);
- ResetCancelConn();
-
- if (!success)
- {
- const char *error = PQerrorMessage(pset.db);
-
- if (strlen(error))
- pg_log_info("%s", error);
-
- CheckConnection();
-
- return -1;
- }
-
- /*
- * If SIGINT is sent while the query is processing, the interrupt will be
- * consumed. The user's intention, though, is to cancel the entire watch
- * process, so detect a sent cancellation request and exit in this case.
- */
- if (is_watch && cancel_pressed)
- {
- ClearOrSaveAllResults();
- return 0;
- }
-
- /* intercept notices */
- notes.in_flip = true;
- initPQExpBuffer(¬es.flip);
- initPQExpBuffer(¬es.flop);
- PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬es);
-
- /* first result */
- result = PQgetResult(pset.db);
-
- while (result != NULL)
- {
- ExecStatusType result_status;
- PGresult *next_result;
- bool last;
-
- if (!AcceptResult(result, false))
- {
- /*
- * Some error occured, either a server-side failure or
- * a failure to submit the command string. Record that.
- */
- const char *error = PQerrorMessage(pset.db);
-
- ShowNoticeMessage(¬es);
- if (strlen(error))
- pg_log_info("%s", error);
- CheckConnection();
- if (!is_watch)
- SetResultVariables(result, false);
- ClearOrSaveResult(result);
- success = false;
-
- /* and switch to next result */
- result = PQgetResult(pset.db);
- continue;
- }
-
- /* must handle COPY before changing the current result */
- result_status = PQresultStatus(result);
- Assert(result_status != PGRES_COPY_BOTH);
- if (result_status == PGRES_COPY_IN ||
- result_status == PGRES_COPY_OUT)
- {
- ShowNoticeMessage(¬es);
-
- if (is_watch)
- {
- ClearOrSaveAllResults();
- pg_log_error("\\watch cannot be used with COPY");
- return -1;
- }
-
- /* use normal notice processor during COPY */
- PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL);
-
- success &= HandleCopyResult(&result);
-
- PQsetNoticeProcessor(pset.db, AppendNoticeMessage, ¬es);
- }
-
- /*
- * Check PQgetResult() again. In the typical case of a single-command
- * string, it will return NULL. Otherwise, we'll have other results
- * to process.
- */
- notes.in_flip = !notes.in_flip;
- next_result = PQgetResult(pset.db);
- notes.in_flip = !notes.in_flip;
- last = (next_result == NULL);
-
- /*
- * Get timing measure before printing the last result.
- *
- * It will include the display of previous results, if any.
- * This cannot be helped because the server goes on processing
- * further queries anyway while the previous ones are being displayed.
- * The parallel execution of the client display hides the server time
- * when it is shorter.
- *
- * With combined queries, timing must be understood as an upper bound
- * of the time spent processing them.
- */
- if (last && pset.timing)
- {
- instr_time now;
- INSTR_TIME_SET_CURRENT(now);
- INSTR_TIME_SUBTRACT(now, before);
- *pelapsed_msec = INSTR_TIME_GET_MILLISEC(now);
- }
-
- /* notices already shown above for copy */
- ShowNoticeMessage(¬es);
-
- /* this may or may not print something depending on settings */
- if (result != NULL)
- success &= HandleQueryResult(result, last);
-
- /* set variables on last result if all went well */
- if (!is_watch && last && success)
- SetResultVariables(result, true);
-
- ClearOrSaveResult(result);
- notes.in_flip = !notes.in_flip;
- result = next_result;
- }
-
- /* reset notice hook */
- PQsetNoticeProcessor(pset.db, NoticeProcessor, NULL);
- termPQExpBuffer(¬es.flip);
- termPQExpBuffer(¬es.flop);
-
- /* may need this to recover from conn loss during COPY */
- if (!CheckConnection())
- return -1;
-
- return success ? 1 : -1;
-}
-
/*
* SendQuery: send the query string to the backend
pset.crosstab_flag || !is_select_command(query))
{
/* Default fetch-it-all-and-print mode */
- int res = SendQueryAndProcessResults(query, &elapsed_msec, false);
- OK = (res >= 0);
- results = NULL;
+ instr_time before,
+ after;
+
+ if (pset.timing)
+ INSTR_TIME_SET_CURRENT(before);
+
+ results = PQexec(pset.db, query);
+
+ /* these operations are included in the timing result: */
+ ResetCancelConn();
+ OK = ProcessResult(&results);
+
+ if (pset.timing)
+ {
+ INSTR_TIME_SET_CURRENT(after);
+ INSTR_TIME_SUBTRACT(after, before);
+ elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
+ }
+
+ /* but printing results isn't: */
+ if (OK && results)
+ OK = PrintQueryResults(results);
}
else
{
PQclear(results);
results = PQdescribePrepared(pset.db, "");
- OK = AcceptResult(results, true) &&
+ OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
if (OK && results)
{
PQclear(results);
results = PQexec(pset.db, buf.data);
- OK = AcceptResult(results, true);
+ OK = AcceptResult(results);
if (pset.timing)
{
}
if (OK && results)
- OK = HandleQueryResult(results, true);
+ OK = PrintQueryResults(results);
termPQExpBuffer(&buf);
}
if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
{
results = PQexec(pset.db, "BEGIN");
- OK = AcceptResult(results, true) &&
+ OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
ClearOrSaveResult(results);
if (!OK)
query);
results = PQexec(pset.db, buf.data);
- OK = AcceptResult(results, true) &&
+ OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
if (!OK)
SetResultVariables(results, OK);
is_pager = false;
}
- OK = AcceptResult(results, true);
+ OK = AcceptResult(results);
Assert(!OK);
SetResultVariables(results, OK);
ClearOrSaveResult(results);
results = PQexec(pset.db, "CLOSE _psql_cursor");
if (OK)
{
- OK = AcceptResult(results, true) &&
+ OK = AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
ClearOrSaveResult(results);
}
if (started_txn)
{
results = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
- OK &= AcceptResult(results, true) &&
+ OK &= AcceptResult(results) &&
(PQresultStatus(results) == PGRES_COMMAND_OK);
ClearOrSaveResult(results);
}