diff options
Diffstat (limited to 'contrib')
-rw-r--r-- | contrib/dblink/dblink.c | 270 | ||||
-rw-r--r-- | contrib/postgres_fdw/connection.c | 232 | ||||
-rw-r--r-- | contrib/postgres_fdw/option.c | 33 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 878 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.h | 4 |
5 files changed, 617 insertions, 800 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index de5bed282f3..f98805fb5f7 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -101,8 +101,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo, const char *conname, const char *sql, bool fail); -static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql); -static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first); +static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql); +static void storeRow(storeInfo *sinfo, PGresult *res, bool first); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static remoteConn *createNewConnection(const char *name); @@ -169,14 +169,6 @@ typedef struct remoteConnHashEnt /* initial number of connection hashes */ #define NUMCONN 16 -static char * -xpstrdup(const char *in) -{ - if (in == NULL) - return NULL; - return pstrdup(in); -} - pg_noreturn static void dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2) { @@ -870,131 +862,123 @@ static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + bool is_sql_cmd; + int ntuples; + int nfields; /* prepTuplestoreResult must have been called previously */ Assert(rsinfo->returnMode == SFRM_Materialize); - PG_TRY(); + if (PQresultStatus(res) == PGRES_COMMAND_OK) { - TupleDesc tupdesc; - bool is_sql_cmd; - int ntuples; - int nfields; + is_sql_cmd = true; - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; + /* + * need a tuple descriptor representing one TEXT column to return the + * command status string as our result tuple + */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + ntuples = 1; + nfields = 1; + } + else + { + Assert(PQresultStatus(res) == PGRES_TUPLES_OK); - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + is_sql_cmd = false; - is_sql_cmd = false; + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + ntuples = PQntuples(res); + nfields = PQnfields(res); + } - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); - } + /* + * check result and tuple descriptor have the same number of columns + */ + if (nfields != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); + if (ntuples > 0) + { + AttInMetadata *attinmeta; + int nestlevel = -1; + Tuplestorestate *tupstore; + MemoryContext oldcontext; + int row; + char **values; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - int nestlevel = -1; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; + attinmeta = TupleDescGetAttInMetadata(tupdesc); - attinmeta = TupleDescGetAttInMetadata(tupdesc); + /* Set GUCs to ensure we read GUC-sensitive data types correctly */ + if (!is_sql_cmd) + nestlevel = applyRemoteGucs(conn); - /* Set GUCs to ensure we read GUC-sensitive data types correctly */ - if (!is_sql_cmd) - nestlevel = applyRemoteGucs(conn); + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); - oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + values = palloc_array(char *, nfields); - values = palloc_array(char *, nfields); + /* put all tuples into the tuplestore */ + for (row = 0; row < ntuples; row++) + { + HeapTuple tuple; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (!is_sql_cmd) { - HeapTuple tuple; + int i; - if (!is_sql_cmd) - { - int i; - - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else + for (i = 0; i < nfields; i++) { - values[0] = PQcmdStatus(res); + if (PQgetisnull(res, row, i)) + values[i] = NULL; + else + values[i] = PQgetvalue(res, row, i); } - - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); + } + else + { + values[0] = PQcmdStatus(res); } - /* clean up GUC settings, if we changed any */ - restoreLocalGucs(nestlevel); + /* build the tuple and put it into the tuplestore. */ + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, tuple); } + + /* clean up GUC settings, if we changed any */ + restoreLocalGucs(nestlevel); } - PG_FINALLY(); - { - /* be sure to release the libpq result */ - PQclear(res); - } - PG_END_TRY(); + + PQclear(res); } /* @@ -1013,16 +997,17 @@ materializeQueryResult(FunctionCallInfo fcinfo, bool fail) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - PGresult *volatile res = NULL; - volatile storeInfo sinfo = {0}; /* prepTuplestoreResult must have been called previously */ Assert(rsinfo->returnMode == SFRM_Materialize); - sinfo.fcinfo = fcinfo; - + /* Use a PG_TRY block to ensure we pump libpq dry of results */ PG_TRY(); { + storeInfo sinfo = {0}; + PGresult *res; + + sinfo.fcinfo = fcinfo; /* Create short-lived memory context for data conversions */ sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext, "dblink temporary context", @@ -1035,14 +1020,7 @@ materializeQueryResult(FunctionCallInfo fcinfo, (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - /* - * dblink_res_error will clear the passed PGresult, so we need - * this ugly dance to avoid doing so twice during error exit - */ - PGresult *res1 = res; - - res = NULL; - dblink_res_error(conn, conname, res1, fail, + dblink_res_error(conn, conname, res, fail, "while executing query"); /* if fail isn't set, we'll return an empty query result */ } @@ -1081,7 +1059,6 @@ materializeQueryResult(FunctionCallInfo fcinfo, tuplestore_puttuple(tupstore, tuple); PQclear(res); - res = NULL; } else { @@ -1090,26 +1067,20 @@ materializeQueryResult(FunctionCallInfo fcinfo, Assert(rsinfo->setResult != NULL); PQclear(res); - res = NULL; } /* clean up data conversion short-lived memory context */ if (sinfo.tmpcontext != NULL) MemoryContextDelete(sinfo.tmpcontext); - sinfo.tmpcontext = NULL; PQclear(sinfo.last_res); - sinfo.last_res = NULL; PQclear(sinfo.cur_res); - sinfo.cur_res = NULL; } PG_CATCH(); { - /* be sure to release any libpq result we collected */ - PQclear(res); - PQclear(sinfo.last_res); - PQclear(sinfo.cur_res); - /* and clear out any pending data in libpq */ + PGresult *res; + + /* be sure to clear out any pending data in libpq */ while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) != NULL) PQclear(res); @@ -1122,7 +1093,7 @@ materializeQueryResult(FunctionCallInfo fcinfo, * Execute query, and send any result rows to sinfo->tuplestore. */ static PGresult * -storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql) +storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql) { bool first = true; int nestlevel = -1; @@ -1190,7 +1161,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql) * (in this case the PGresult might contain either zero or one row). */ static void -storeRow(volatile storeInfo *sinfo, PGresult *res, bool first) +storeRow(storeInfo *sinfo, PGresult *res, bool first) { int nfields = PQnfields(res); HeapTuple tuple; @@ -2795,10 +2766,13 @@ dblink_connstr_check(const char *connstr) /* * Report an error received from the remote server * - * res: the received error result (will be freed) + * res: the received error result * fail: true for ERROR ereport, false for NOTICE * fmt and following args: sprintf-style format and values for errcontext; * the resulting string should be worded like "while <some action>" + * + * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error, + * in which case memory context cleanup will clear it eventually). */ static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res, @@ -2806,15 +2780,11 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, { int level; char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); - char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); - char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); - char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); - char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT); + char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); int sqlstate; - char *message_primary; - char *message_detail; - char *message_hint; - char *message_context; va_list ap; char dblink_context_msg[512]; @@ -2832,11 +2802,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, else sqlstate = ERRCODE_CONNECTION_FAILURE; - message_primary = xpstrdup(pg_diag_message_primary); - message_detail = xpstrdup(pg_diag_message_detail); - message_hint = xpstrdup(pg_diag_message_hint); - message_context = xpstrdup(pg_diag_context); - /* * If we don't get a message from the PGresult, try the PGconn. This is * needed because for connection-level failures, PQgetResult may just @@ -2846,14 +2811,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, message_primary = pchomp(PQerrorMessage(conn)); /* - * Now that we've copied all the data we need out of the PGresult, it's - * safe to free it. We must do this to avoid PGresult leakage. We're - * leaking all the strings too, but those are in palloc'd memory that will - * get cleaned up eventually. - */ - PQclear(res); - - /* * Format the basic errcontext string. Below, we'll add on something * about the connection name. That's a violation of the translatability * guidelines about constructing error messages out of parts, but since @@ -2877,6 +2834,7 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, dblink_context_msg, conname)) : (errcontext("%s on unnamed dblink connection", dblink_context_msg)))); + PQclear(res); } /* diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index c1ce6f33436..a33843fcf85 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -815,7 +815,7 @@ static void do_sql_command_begin(PGconn *conn, const char *sql) { if (!PQsendQuery(conn, sql)) - pgfdw_report_error(ERROR, NULL, conn, false, sql); + pgfdw_report_error(ERROR, NULL, conn, sql); } static void @@ -830,10 +830,10 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) * would be large compared to the overhead of PQconsumeInput.) */ if (consume_input && !PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, sql); + pgfdw_report_error(ERROR, NULL, conn, sql); res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); + pgfdw_report_error(ERROR, res, conn, sql); PQclear(res); } @@ -967,62 +967,55 @@ pgfdw_get_result(PGconn *conn) * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) - * res: PGresult containing the error + * res: PGresult containing the error (might be NULL) * conn: connection we did the query on - * clear: if true, PQclear the result (otherwise caller will handle it) * sql: NULL, or text of remote command we tried to execute * + * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error, + * in which case memory context cleanup will clear it eventually). + * * Note: callers that choose not to throw ERROR for a remote error are * responsible for making sure that the associated ConnCacheEntry gets * marked with have_error = true. */ void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, - bool clear, const char *sql) + const char *sql) { - /* If requested, PGresult must be released before leaving this function. */ - PG_TRY(); - { - char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); - char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); - char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); - char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); - char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); - int sqlstate; - - if (diag_sqlstate) - sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], - diag_sqlstate[1], - diag_sqlstate[2], - diag_sqlstate[3], - diag_sqlstate[4]); - else - sqlstate = ERRCODE_CONNECTION_FAILURE; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY); + char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL); + char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT); + char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT); + int sqlstate; + + if (diag_sqlstate) + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + else + sqlstate = ERRCODE_CONNECTION_FAILURE; - /* - * If we don't get a message from the PGresult, try the PGconn. This - * is needed because for connection-level failures, PQgetResult may - * just return NULL, not a PGresult at all. - */ - if (message_primary == NULL) - message_primary = pchomp(PQerrorMessage(conn)); - - ereport(elevel, - (errcode(sqlstate), - (message_primary != NULL && message_primary[0] != '\0') ? - errmsg_internal("%s", message_primary) : - errmsg("could not obtain message string for remote error"), - message_detail ? errdetail_internal("%s", message_detail) : 0, - message_hint ? errhint("%s", message_hint) : 0, - message_context ? errcontext("%s", message_context) : 0, - sql ? errcontext("remote SQL command: %s", sql) : 0)); - } - PG_FINALLY(); - { - if (clear) - PQclear(res); - } - PG_END_TRY(); + /* + * If we don't get a message from the PGresult, try the PGconn. This is + * needed because for connection-level failures, PQgetResult may just + * return NULL, not a PGresult at all. + */ + if (message_primary == NULL) + message_primary = pchomp(PQerrorMessage(conn)); + + ereport(elevel, + (errcode(sqlstate), + (message_primary != NULL && message_primary[0] != '\0') ? + errmsg_internal("%s", message_primary) : + errmsg("could not obtain message string for remote error"), + message_detail ? errdetail_internal("%s", message_detail) : 0, + message_hint ? errhint("%s", message_hint) : 0, + message_context ? errcontext("%s", message_context) : 0, + sql ? errcontext("remote SQL command: %s", sql) : 0)); + PQclear(res); } /* @@ -1545,7 +1538,7 @@ pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query) */ if (!PQsendQuery(conn, query)) { - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report_error(WARNING, NULL, conn, query); return false; } @@ -1570,7 +1563,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, */ if (consume_input && !PQconsumeInput(conn)) { - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report_error(WARNING, NULL, conn, query); return false; } @@ -1582,7 +1575,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, (errmsg("could not get query result due to timeout"), errcontext("remote SQL command: %s", query))); else - pgfdw_report_error(WARNING, NULL, conn, false, query); + pgfdw_report_error(WARNING, NULL, conn, query); return false; } @@ -1590,7 +1583,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query, /* Issue a warning if not successful. */ if (PQresultStatus(result) != PGRES_COMMAND_OK) { - pgfdw_report_error(WARNING, result, conn, true, query); + pgfdw_report_error(WARNING, result, conn, query); return ignore_errors; } PQclear(result); @@ -1618,103 +1611,90 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out) { - volatile bool failed = false; - PGresult *volatile last_res = NULL; + bool failed = false; + PGresult *last_res = NULL; + int canceldelta = RETRY_CANCEL_TIMEOUT * 2; *result = NULL; *timed_out = false; - - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); + for (;;) { - int canceldelta = RETRY_CANCEL_TIMEOUT * 2; + PGresult *res; - for (;;) + while (PQisBusy(conn)) { - PGresult *res; + int wc; + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; - while (PQisBusy(conn)) + /* If timeout has expired, give up. */ + if (now >= endtime) { - int wc; - TimestampTz now = GetCurrentTimestamp(); - long cur_timeout; - - /* If timeout has expired, give up. */ - if (now >= endtime) - { - *timed_out = true; - failed = true; - goto exit; - } + *timed_out = true; + failed = true; + goto exit; + } - /* If we need to re-issue the cancel request, do that. */ - if (now >= retrycanceltime) - { - /* We ignore failure to issue the repeated request. */ - (void) libpqsrv_cancel(conn, endtime); + /* If we need to re-issue the cancel request, do that. */ + if (now >= retrycanceltime) + { + /* We ignore failure to issue the repeated request. */ + (void) libpqsrv_cancel(conn, endtime); - /* Recompute "now" in case that took measurable time. */ - now = GetCurrentTimestamp(); + /* Recompute "now" in case that took measurable time. */ + now = GetCurrentTimestamp(); - /* Adjust re-cancel timeout in increasing steps. */ - retrycanceltime = TimestampTzPlusMilliseconds(now, - canceldelta); - canceldelta += canceldelta; - } + /* Adjust re-cancel timeout in increasing steps. */ + retrycanceltime = TimestampTzPlusMilliseconds(now, + canceldelta); + canceldelta += canceldelta; + } - /* If timeout has expired, give up, else get sleep time. */ - cur_timeout = TimestampDifferenceMilliseconds(now, - Min(endtime, - retrycanceltime)); - if (cur_timeout <= 0) - { - *timed_out = true; - failed = true; - goto exit; - } + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, + Min(endtime, + retrycanceltime)); + if (cur_timeout <= 0) + { + *timed_out = true; + failed = true; + goto exit; + } - /* first time, allocate or get the custom wait event */ - if (pgfdw_we_cleanup_result == 0) - pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult"); + /* first time, allocate or get the custom wait event */ + if (pgfdw_we_cleanup_result == 0) + pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult"); - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - cur_timeout, pgfdw_we_cleanup_result); - ResetLatch(MyLatch); + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + PQsocket(conn), + cur_timeout, pgfdw_we_cleanup_result); + ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) { - if (!PQconsumeInput(conn)) - { - /* connection trouble */ - failed = true; - goto exit; - } + /* connection trouble */ + failed = true; + goto exit; } } + } - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ - PQclear(last_res); - last_res = res; - } -exit: ; - } - PG_CATCH(); - { PQclear(last_res); - PG_RE_THROW(); + last_res = res; } - PG_END_TRY(); - +exit: if (failed) PQclear(last_res); else diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index c2f936640bc..d6fa89bad93 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -21,6 +21,7 @@ #include "libpq/libpq-be.h" #include "postgres_fdw.h" #include "utils/guc.h" +#include "utils/memutils.h" #include "utils/varlena.h" /* @@ -40,12 +41,6 @@ typedef struct PgFdwOption static PgFdwOption *postgres_fdw_options; /* - * Valid options for libpq. - * Allocated and filled in InitPgFdwOptions. - */ -static PQconninfoOption *libpq_options; - -/* * GUC parameters */ char *pgfdw_application_name = NULL; @@ -239,6 +234,7 @@ static void InitPgFdwOptions(void) { int num_libpq_opts; + PQconninfoOption *libpq_options; PQconninfoOption *lopt; PgFdwOption *popt; @@ -307,8 +303,8 @@ InitPgFdwOptions(void) * Get list of valid libpq options. * * To avoid unnecessary work, we get the list once and use it throughout - * the lifetime of this backend process. We don't need to care about - * memory context issues, because PQconndefaults allocates with malloc. + * the lifetime of this backend process. Hence, we'll allocate it in + * TopMemoryContext. */ libpq_options = PQconndefaults(); if (!libpq_options) /* assume reason for failure is OOM */ @@ -325,19 +321,11 @@ InitPgFdwOptions(void) /* * Construct an array which consists of all valid options for * postgres_fdw, by appending FDW-specific options to libpq options. - * - * We use plain malloc here to allocate postgres_fdw_options because it - * lives as long as the backend process does. Besides, keeping - * libpq_options in memory allows us to avoid copying every keyword - * string. */ postgres_fdw_options = (PgFdwOption *) - malloc(sizeof(PgFdwOption) * num_libpq_opts + - sizeof(non_libpq_options)); - if (postgres_fdw_options == NULL) - ereport(ERROR, - (errcode(ERRCODE_FDW_OUT_OF_MEMORY), - errmsg("out of memory"))); + MemoryContextAlloc(TopMemoryContext, + sizeof(PgFdwOption) * num_libpq_opts + + sizeof(non_libpq_options)); popt = postgres_fdw_options; for (lopt = libpq_options; lopt->keyword; lopt++) @@ -355,8 +343,8 @@ InitPgFdwOptions(void) if (strncmp(lopt->keyword, "oauth_", strlen("oauth_")) == 0) continue; - /* We don't have to copy keyword string, as described above. */ - popt->keyword = lopt->keyword; + popt->keyword = MemoryContextStrdup(TopMemoryContext, + lopt->keyword); /* * "user" and any secret options are allowed only on user mappings. @@ -371,6 +359,9 @@ InitPgFdwOptions(void) popt++; } + /* Done with libpq's output structure. */ + PQconninfoFree(libpq_options); + /* Append FDW-specific options and dummy terminator. */ memcpy(popt, non_libpq_options, sizeof(non_libpq_options)); } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index e0a34b27c7c..25b287be069 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -240,7 +240,6 @@ typedef struct PgFdwDirectModifyState PGresult *result; /* result for query */ int num_tuples; /* # of result tuples */ int next_tuple; /* index of next one to return */ - MemoryContextCallback result_cb; /* ensures result will get freed */ Relation resultRel; /* relcache entry for the target relation */ AttrNumber *attnoMap; /* array of attnums of input user columns */ AttrNumber ctidAttno; /* attnum of input ctid column */ @@ -1703,13 +1702,9 @@ postgresReScanForeignScan(ForeignScanState *node) return; } - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fsstate->conn, sql); PQclear(res); /* Now force a fresh FETCH. */ @@ -2672,17 +2667,6 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) node->fdw_state = dmstate; /* - * We use a memory context callback to ensure that the dmstate's PGresult - * (if any) will be released, even if the query fails somewhere that's - * outside our control. The callback is always armed for the duration of - * the query; this relies on PQclear(NULL) being a no-op. - */ - dmstate->result_cb.func = (MemoryContextCallbackFunction) PQclear; - dmstate->result_cb.arg = NULL; - MemoryContextRegisterResetCallback(CurrentMemoryContext, - &dmstate->result_cb); - - /* * Identify which user to do the remote access as. This should match what * ExecCheckPermissions() does. */ @@ -2829,13 +2813,7 @@ postgresEndDirectModify(ForeignScanState *node) return; /* Release PGresult */ - if (dmstate->result) - { - PQclear(dmstate->result); - dmstate->result = NULL; - /* ... and don't forget to disable the callback */ - dmstate->result_cb.arg = NULL; - } + PQclear(dmstate->result); /* Release remote connection */ ReleaseConnection(dmstate->conn); @@ -3626,41 +3604,32 @@ get_remote_estimate(const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost) { - PGresult *volatile res = NULL; - - /* PGresult must be released before leaving this function. */ - PG_TRY(); - { - char *line; - char *p; - int n; + PGresult *res; + char *line; + char *p; + int n; - /* - * Execute EXPLAIN remotely. - */ - res = pgfdw_exec_query(conn, sql, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, sql); + /* + * Execute EXPLAIN remotely. + */ + res = pgfdw_exec_query(conn, sql, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, sql); - /* - * Extract cost numbers for topmost plan node. Note we search for a - * left paren from the end of the line to avoid being confused by - * other uses of parentheses. - */ - line = PQgetvalue(res, 0, 0); - p = strrchr(line, '('); - if (p == NULL) - elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); - n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", - startup_cost, total_cost, rows, width); - if (n != 4) - elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); - } - PG_FINALLY(); - { - PQclear(res); - } - PG_END_TRY(); + /* + * Extract cost numbers for topmost plan node. Note we search for a left + * paren from the end of the line to avoid being confused by other uses of + * parentheses. + */ + line = PQgetvalue(res, 0, 0); + p = strrchr(line, '('); + if (p == NULL) + elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); + n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", + startup_cost, total_cost, rows, width); + if (n != 4) + elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); + PQclear(res); } /* @@ -3800,17 +3769,14 @@ create_cursor(ForeignScanState *node) */ if (!PQsendQueryParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + pgfdw_report_error(ERROR, NULL, conn, buf.data); /* * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. */ res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, fsstate->query); + pgfdw_report_error(ERROR, res, conn, fsstate->query); PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ @@ -3832,7 +3798,10 @@ static void fetch_more_data(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - PGresult *volatile res = NULL; + PGconn *conn = fsstate->conn; + PGresult *res; + int numrows; + int i; MemoryContext oldcontext; /* @@ -3843,74 +3812,63 @@ fetch_more_data(ForeignScanState *node) MemoryContextReset(fsstate->batch_cxt); oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); - /* PGresult must be released before leaving this function. */ - PG_TRY(); + if (fsstate->async_capable) { - PGconn *conn = fsstate->conn; - int numrows; - int i; + Assert(fsstate->conn_state->pendingAreq); - if (fsstate->async_capable) - { - Assert(fsstate->conn_state->pendingAreq); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, fsstate->query); - /* - * The query was already sent by an earlier call to - * fetch_more_data_begin. So now we just fetch the result. - */ - res = pgfdw_get_result(conn); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; - /* Reset per-connection state */ - fsstate->conn_state->pendingAreq = NULL; - } - else - { - char sql[64]; + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); - /* This is a regular synchronous fetch. */ - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, fsstate->query); + } - res = pgfdw_exec_query(conn, sql, fsstate->conn_state); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); - } + /* Convert the data into HeapTuples */ + numrows = PQntuples(res); + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; - /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + for (i = 0; i < numrows; i++) + { + Assert(IsA(node->ss.ps.plan, ForeignScan)); - for (i = 0; i < numrows; i++) - { - Assert(IsA(node->ss.ps.plan, ForeignScan)); - - fsstate->tuples[i] = - make_tuple_from_result_row(res, i, - fsstate->rel, - fsstate->attinmeta, - fsstate->retrieved_attrs, - node, - fsstate->temp_cxt); - } + fsstate->tuples[i] = + make_tuple_from_result_row(res, i, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + node, + fsstate->temp_cxt); + } - /* Update fetch_ct_2 */ - if (fsstate->fetch_ct_2 < 2) - fsstate->fetch_ct_2++; + /* Update fetch_ct_2 */ + if (fsstate->fetch_ct_2 < 2) + fsstate->fetch_ct_2++; - /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); - } - PG_FINALLY(); - { - PQclear(res); - } - PG_END_TRY(); + /* Must be EOF if we didn't get as many tuples as we asked for. */ + fsstate->eof_reached = (numrows < fsstate->fetch_size); + + PQclear(res); MemoryContextSwitchTo(oldcontext); } @@ -3984,14 +3942,9 @@ close_cursor(PGconn *conn, unsigned int cursor_number, PGresult *res; snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); + pgfdw_report_error(ERROR, res, conn, sql); PQclear(res); } @@ -4199,18 +4152,15 @@ execute_foreign_modify(EState *estate, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query); /* * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. */ res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) @@ -4269,17 +4219,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) fmstate->query, 0, NULL)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query); /* * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. */ res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ @@ -4370,37 +4317,25 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, /* * store_returning_result * Store the result of a RETURNING clause - * - * On error, be sure to release the PGresult on the way out. Callers do not - * have PG_TRY blocks to ensure this happens. */ static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res) { - PG_TRY(); - { - HeapTuple newtup; + HeapTuple newtup; - newtup = make_tuple_from_result_row(res, 0, - fmstate->rel, - fmstate->attinmeta, - fmstate->retrieved_attrs, - NULL, - fmstate->temp_cxt); + newtup = make_tuple_from_result_row(res, 0, + fmstate->rel, + fmstate->attinmeta, + fmstate->retrieved_attrs, + NULL, + fmstate->temp_cxt); - /* - * The returning slot will not necessarily be suitable to store - * heaptuples directly, so allow for conversion. - */ - ExecForceStoreHeapTuple(newtup, slot, true); - } - PG_CATCH(); - { - PQclear(res); - PG_RE_THROW(); - } - PG_END_TRY(); + /* + * The returning slot will not necessarily be suitable to store heaptuples + * directly, so allow for conversion. + */ + ExecForceStoreHeapTuple(newtup, slot, true); } /* @@ -4436,14 +4371,9 @@ deallocate_query(PgFdwModifyState *fmstate) return; snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + pgfdw_report_error(ERROR, res, fmstate->conn, sql); PQclear(res); pfree(fmstate->p_name); fmstate->p_name = NULL; @@ -4611,24 +4541,24 @@ execute_dml_stmt(ForeignScanState *node) */ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + pgfdw_report_error(ERROR, NULL, dmstate->conn, dmstate->query); /* * Get the result, and check for success. - * - * We use a memory context callback to ensure that the PGresult will be - * released, even if the query fails somewhere that's outside our control. - * The callback is already registered, just need to fill in its arg. */ - Assert(dmstate->result == NULL); dmstate->result = pgfdw_get_result(dmstate->conn); - dmstate->result_cb.arg = dmstate->result; - if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, false, + pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, dmstate->query); + /* + * The result potentially needs to survive across multiple executor row + * cycles, so move it to the context where the dmstate is. + */ + dmstate->result = libpqsrv_PGresultSetParent(dmstate->result, + GetMemoryChunkContext(dmstate)); + /* Get the number of rows affected. */ if (dmstate->has_returning) dmstate->num_tuples = PQntuples(dmstate->result); @@ -4965,7 +4895,7 @@ postgresAnalyzeForeignTable(Relation relation, UserMapping *user; PGconn *conn; StringInfoData sql; - PGresult *volatile res = NULL; + PGresult *res; /* Return the row-analysis function pointer */ *func = postgresAcquireSampleRowsFunc; @@ -4991,22 +4921,14 @@ postgresAnalyzeForeignTable(Relation relation, initStringInfo(&sql); deparseAnalyzeSizeSql(&sql, relation); - /* In what follows, do not risk leaking any PGresults. */ - PG_TRY(); - { - res = pgfdw_exec_query(conn, sql.data, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, sql.data); - if (PQntuples(res) != 1 || PQnfields(res) != 1) - elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); - *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); - } - PG_FINALLY(); - { - PQclear(res); - } - PG_END_TRY(); + if (PQntuples(res) != 1 || PQnfields(res) != 1) + elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); + *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); + PQclear(res); ReleaseConnection(conn); @@ -5027,9 +4949,9 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample) UserMapping *user; PGconn *conn; StringInfoData sql; - PGresult *volatile res = NULL; - volatile double reltuples = -1; - volatile char relkind = 0; + PGresult *res; + double reltuples; + char relkind; /* assume the remote relation does not support TABLESAMPLE */ *can_tablesample = false; @@ -5048,24 +4970,15 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample) initStringInfo(&sql); deparseAnalyzeInfoSql(&sql, relation); - /* In what follows, do not risk leaking any PGresults. */ - PG_TRY(); - { - res = pgfdw_exec_query(conn, sql.data, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, sql.data); - if (PQntuples(res) != 1 || PQnfields(res) != 2) - elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query"); - reltuples = strtod(PQgetvalue(res, 0, 0), NULL); - relkind = *(PQgetvalue(res, 0, 1)); - } - PG_FINALLY(); - { - if (res) - PQclear(res); - } - PG_END_TRY(); + if (PQntuples(res) != 1 || PQnfields(res) != 2) + elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query"); + reltuples = strtod(PQgetvalue(res, 0, 0), NULL); + relkind = *(PQgetvalue(res, 0, 1)); + PQclear(res); ReleaseConnection(conn); @@ -5108,7 +5021,9 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, double reltuples; unsigned int cursor_number; StringInfoData sql; - PGresult *volatile res = NULL; + PGresult *res; + char fetch_sql[64]; + int fetch_size; ListCell *lc; /* Initialize workspace state */ @@ -5285,91 +5200,76 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs); - /* In what follows, do not risk leaking any PGresults. */ - PG_TRY(); - { - char fetch_sql[64]; - int fetch_size; - - res = pgfdw_exec_query(conn, sql.data, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, false, sql.data); - PQclear(res); - res = NULL; + res = pgfdw_exec_query(conn, sql.data, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, sql.data); + PQclear(res); - /* - * Determine the fetch size. The default is arbitrary, but shouldn't - * be enormous. - */ - fetch_size = 100; - foreach(lc, server->options) - { - DefElem *def = (DefElem *) lfirst(lc); + /* + * Determine the fetch size. The default is arbitrary, but shouldn't be + * enormous. + */ + fetch_size = 100; + foreach(lc, server->options) + { + DefElem *def = (DefElem *) lfirst(lc); - if (strcmp(def->defname, "fetch_size") == 0) - { - (void) parse_int(defGetString(def), &fetch_size, 0, NULL); - break; - } - } - foreach(lc, table->options) + if (strcmp(def->defname, "fetch_size") == 0) { - DefElem *def = (DefElem *) lfirst(lc); - - if (strcmp(def->defname, "fetch_size") == 0) - { - (void) parse_int(defGetString(def), &fetch_size, 0, NULL); - break; - } + (void) parse_int(defGetString(def), &fetch_size, 0, NULL); + break; } + } + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); - /* Construct command to fetch rows from remote. */ - snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", - fetch_size, cursor_number); - - /* Retrieve and process rows a batch at a time. */ - for (;;) + if (strcmp(def->defname, "fetch_size") == 0) { - int numrows; - int i; + (void) parse_int(defGetString(def), &fetch_size, 0, NULL); + break; + } + } - /* Allow users to cancel long query */ - CHECK_FOR_INTERRUPTS(); + /* Construct command to fetch rows from remote. */ + snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", + fetch_size, cursor_number); - /* - * XXX possible future improvement: if rowstoskip is large, we - * could issue a MOVE rather than physically fetching the rows, - * then just adjust rowstoskip and samplerows appropriately. - */ + /* Retrieve and process rows a batch at a time. */ + for (;;) + { + int numrows; + int i; - /* Fetch some rows */ - res = pgfdw_exec_query(conn, fetch_sql, NULL); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, sql.data); + /* Allow users to cancel long query */ + CHECK_FOR_INTERRUPTS(); - /* Process whatever we got. */ - numrows = PQntuples(res); - for (i = 0; i < numrows; i++) - analyze_row_processor(res, i, &astate); + /* + * XXX possible future improvement: if rowstoskip is large, we could + * issue a MOVE rather than physically fetching the rows, then just + * adjust rowstoskip and samplerows appropriately. + */ - PQclear(res); - res = NULL; + /* Fetch some rows */ + res = pgfdw_exec_query(conn, fetch_sql, NULL); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, sql.data); - /* Must be EOF if we didn't get all the rows requested. */ - if (numrows < fetch_size) - break; - } + /* Process whatever we got. */ + numrows = PQntuples(res); + for (i = 0; i < numrows; i++) + analyze_row_processor(res, i, &astate); - /* Close the cursor, just to be tidy. */ - close_cursor(conn, cursor_number, NULL); - } - PG_CATCH(); - { PQclear(res); - PG_RE_THROW(); + + /* Must be EOF if we didn't get all the rows requested. */ + if (numrows < fetch_size) + break; } - PG_END_TRY(); + + /* Close the cursor, just to be tidy. */ + close_cursor(conn, cursor_number, NULL); ReleaseConnection(conn); @@ -5481,7 +5381,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) UserMapping *mapping; PGconn *conn; StringInfoData buf; - PGresult *volatile res = NULL; + PGresult *res; int numrows, i; ListCell *lc; @@ -5520,243 +5420,231 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) /* Create workspace for strings */ initStringInfo(&buf); - /* In what follows, do not risk leaking any PGresults. */ - PG_TRY(); - { - /* Check that the schema really exists */ - appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); - deparseStringLiteral(&buf, stmt->remote_schema); + /* Check that the schema really exists */ + appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, buf.data); - if (PQntuples(res) != 1) - ereport(ERROR, - (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), - errmsg("schema \"%s\" is not present on foreign server \"%s\"", - stmt->remote_schema, server->servername))); + if (PQntuples(res) != 1) + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("schema \"%s\" is not present on foreign server \"%s\"", + stmt->remote_schema, server->servername))); - PQclear(res); - res = NULL; - resetStringInfo(&buf); + PQclear(res); + resetStringInfo(&buf); - /* - * Fetch all table data from this schema, possibly restricted by - * EXCEPT or LIMIT TO. (We don't actually need to pay any attention - * to EXCEPT/LIMIT TO here, because the core code will filter the - * statements we return according to those lists anyway. But it - * should save a few cycles to not process excluded tables in the - * first place.) - * - * Import table data for partitions only when they are explicitly - * specified in LIMIT TO clause. Otherwise ignore them and only - * include the definitions of the root partitioned tables to allow - * access to the complete remote data set locally in the schema - * imported. - * - * Note: because we run the connection with search_path restricted to - * pg_catalog, the format_type() and pg_get_expr() outputs will always - * include a schema name for types/functions in other schemas, which - * is what we want. - */ + /* + * Fetch all table data from this schema, possibly restricted by EXCEPT or + * LIMIT TO. (We don't actually need to pay any attention to EXCEPT/LIMIT + * TO here, because the core code will filter the statements we return + * according to those lists anyway. But it should save a few cycles to + * not process excluded tables in the first place.) + * + * Import table data for partitions only when they are explicitly + * specified in LIMIT TO clause. Otherwise ignore them and only include + * the definitions of the root partitioned tables to allow access to the + * complete remote data set locally in the schema imported. + * + * Note: because we run the connection with search_path restricted to + * pg_catalog, the format_type() and pg_get_expr() outputs will always + * include a schema name for types/functions in other schemas, which is + * what we want. + */ + appendStringInfoString(&buf, + "SELECT relname, " + " attname, " + " format_type(atttypid, atttypmod), " + " attnotnull, " + " pg_get_expr(adbin, adrelid), "); + + /* Generated columns are supported since Postgres 12 */ + if (PQserverVersion(conn) >= 120000) appendStringInfoString(&buf, - "SELECT relname, " - " attname, " - " format_type(atttypid, atttypmod), " - " attnotnull, " - " pg_get_expr(adbin, adrelid), "); - - /* Generated columns are supported since Postgres 12 */ - if (PQserverVersion(conn) >= 120000) - appendStringInfoString(&buf, - " attgenerated, "); - else - appendStringInfoString(&buf, - " NULL, "); - - if (import_collate) - appendStringInfoString(&buf, - " collname, " - " collnsp.nspname "); - else - appendStringInfoString(&buf, - " NULL, NULL "); - + " attgenerated, "); + else appendStringInfoString(&buf, - "FROM pg_class c " - " JOIN pg_namespace n ON " - " relnamespace = n.oid " - " LEFT JOIN pg_attribute a ON " - " attrelid = c.oid AND attnum > 0 " - " AND NOT attisdropped " - " LEFT JOIN pg_attrdef ad ON " - " adrelid = c.oid AND adnum = attnum "); - - if (import_collate) - appendStringInfoString(&buf, - " LEFT JOIN pg_collation coll ON " - " coll.oid = attcollation " - " LEFT JOIN pg_namespace collnsp ON " - " collnsp.oid = collnamespace "); + " NULL, "); + if (import_collate) appendStringInfoString(&buf, - "WHERE c.relkind IN (" - CppAsString2(RELKIND_RELATION) "," - CppAsString2(RELKIND_VIEW) "," - CppAsString2(RELKIND_FOREIGN_TABLE) "," - CppAsString2(RELKIND_MATVIEW) "," - CppAsString2(RELKIND_PARTITIONED_TABLE) ") " - " AND n.nspname = "); - deparseStringLiteral(&buf, stmt->remote_schema); - - /* Partitions are supported since Postgres 10 */ - if (PQserverVersion(conn) >= 100000 && - stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) - appendStringInfoString(&buf, " AND NOT c.relispartition "); - - /* Apply restrictions for LIMIT TO and EXCEPT */ - if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || - stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + " collname, " + " collnsp.nspname "); + else + appendStringInfoString(&buf, + " NULL, NULL "); + + appendStringInfoString(&buf, + "FROM pg_class c " + " JOIN pg_namespace n ON " + " relnamespace = n.oid " + " LEFT JOIN pg_attribute a ON " + " attrelid = c.oid AND attnum > 0 " + " AND NOT attisdropped " + " LEFT JOIN pg_attrdef ad ON " + " adrelid = c.oid AND adnum = attnum "); + + if (import_collate) + appendStringInfoString(&buf, + " LEFT JOIN pg_collation coll ON " + " coll.oid = attcollation " + " LEFT JOIN pg_namespace collnsp ON " + " collnsp.oid = collnamespace "); + + appendStringInfoString(&buf, + "WHERE c.relkind IN (" + CppAsString2(RELKIND_RELATION) "," + CppAsString2(RELKIND_VIEW) "," + CppAsString2(RELKIND_FOREIGN_TABLE) "," + CppAsString2(RELKIND_MATVIEW) "," + CppAsString2(RELKIND_PARTITIONED_TABLE) ") " + " AND n.nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); + + /* Partitions are supported since Postgres 10 */ + if (PQserverVersion(conn) >= 100000 && + stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) + appendStringInfoString(&buf, " AND NOT c.relispartition "); + + /* Apply restrictions for LIMIT TO and EXCEPT */ + if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || + stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + { + bool first_item = true; + + appendStringInfoString(&buf, " AND c.relname "); + if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + appendStringInfoString(&buf, "NOT "); + appendStringInfoString(&buf, "IN ("); + + /* Append list of table names within IN clause */ + foreach(lc, stmt->table_list) { - bool first_item = true; + RangeVar *rv = (RangeVar *) lfirst(lc); - appendStringInfoString(&buf, " AND c.relname "); - if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) - appendStringInfoString(&buf, "NOT "); - appendStringInfoString(&buf, "IN ("); + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ", "); + deparseStringLiteral(&buf, rv->relname); + } + appendStringInfoChar(&buf, ')'); + } - /* Append list of table names within IN clause */ - foreach(lc, stmt->table_list) - { - RangeVar *rv = (RangeVar *) lfirst(lc); + /* Append ORDER BY at the end of query to ensure output ordering */ + appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); - if (first_item) - first_item = false; - else - appendStringInfoString(&buf, ", "); - deparseStringLiteral(&buf, rv->relname); - } - appendStringInfoChar(&buf, ')'); - } + /* Fetch the data */ + res = pgfdw_exec_query(conn, buf.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, buf.data); - /* Append ORDER BY at the end of query to ensure output ordering */ - appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); + /* Process results */ + numrows = PQntuples(res); + /* note: incrementation of i happens in inner loop's while() test */ + for (i = 0; i < numrows;) + { + char *tablename = PQgetvalue(res, i, 0); + bool first_item = true; - /* Fetch the data */ - res = pgfdw_exec_query(conn, buf.data, NULL); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, buf.data); + resetStringInfo(&buf); + appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", + quote_identifier(tablename)); - /* Process results */ - numrows = PQntuples(res); - /* note: incrementation of i happens in inner loop's while() test */ - for (i = 0; i < numrows;) + /* Scan all rows for this table */ + do { - char *tablename = PQgetvalue(res, i, 0); - bool first_item = true; + char *attname; + char *typename; + char *attnotnull; + char *attgenerated; + char *attdefault; + char *collname; + char *collnamespace; + + /* If table has no columns, we'll see nulls here */ + if (PQgetisnull(res, i, 1)) + continue; - resetStringInfo(&buf); - appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", - quote_identifier(tablename)); + attname = PQgetvalue(res, i, 1); + typename = PQgetvalue(res, i, 2); + attnotnull = PQgetvalue(res, i, 3); + attdefault = PQgetisnull(res, i, 4) ? NULL : + PQgetvalue(res, i, 4); + attgenerated = PQgetisnull(res, i, 5) ? NULL : + PQgetvalue(res, i, 5); + collname = PQgetisnull(res, i, 6) ? NULL : + PQgetvalue(res, i, 6); + collnamespace = PQgetisnull(res, i, 7) ? NULL : + PQgetvalue(res, i, 7); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ",\n"); - /* Scan all rows for this table */ - do - { - char *attname; - char *typename; - char *attnotnull; - char *attgenerated; - char *attdefault; - char *collname; - char *collnamespace; - - /* If table has no columns, we'll see nulls here */ - if (PQgetisnull(res, i, 1)) - continue; + /* Print column name and type */ + appendStringInfo(&buf, " %s %s", + quote_identifier(attname), + typename); - attname = PQgetvalue(res, i, 1); - typename = PQgetvalue(res, i, 2); - attnotnull = PQgetvalue(res, i, 3); - attdefault = PQgetisnull(res, i, 4) ? NULL : - PQgetvalue(res, i, 4); - attgenerated = PQgetisnull(res, i, 5) ? NULL : - PQgetvalue(res, i, 5); - collname = PQgetisnull(res, i, 6) ? NULL : - PQgetvalue(res, i, 6); - collnamespace = PQgetisnull(res, i, 7) ? NULL : - PQgetvalue(res, i, 7); - - if (first_item) - first_item = false; - else - appendStringInfoString(&buf, ",\n"); + /* + * Add column_name option so that renaming the foreign table's + * column doesn't break the association to the underlying column. + */ + appendStringInfoString(&buf, " OPTIONS (column_name "); + deparseStringLiteral(&buf, attname); + appendStringInfoChar(&buf, ')'); - /* Print column name and type */ - appendStringInfo(&buf, " %s %s", - quote_identifier(attname), - typename); + /* Add COLLATE if needed */ + if (import_collate && collname != NULL && collnamespace != NULL) + appendStringInfo(&buf, " COLLATE %s.%s", + quote_identifier(collnamespace), + quote_identifier(collname)); - /* - * Add column_name option so that renaming the foreign table's - * column doesn't break the association to the underlying - * column. - */ - appendStringInfoString(&buf, " OPTIONS (column_name "); - deparseStringLiteral(&buf, attname); - appendStringInfoChar(&buf, ')'); - - /* Add COLLATE if needed */ - if (import_collate && collname != NULL && collnamespace != NULL) - appendStringInfo(&buf, " COLLATE %s.%s", - quote_identifier(collnamespace), - quote_identifier(collname)); - - /* Add DEFAULT if needed */ - if (import_default && attdefault != NULL && - (!attgenerated || !attgenerated[0])) - appendStringInfo(&buf, " DEFAULT %s", attdefault); - - /* Add GENERATED if needed */ - if (import_generated && attgenerated != NULL && - attgenerated[0] == ATTRIBUTE_GENERATED_STORED) - { - Assert(attdefault != NULL); - appendStringInfo(&buf, - " GENERATED ALWAYS AS (%s) STORED", - attdefault); - } + /* Add DEFAULT if needed */ + if (import_default && attdefault != NULL && + (!attgenerated || !attgenerated[0])) + appendStringInfo(&buf, " DEFAULT %s", attdefault); - /* Add NOT NULL if needed */ - if (import_not_null && attnotnull[0] == 't') - appendStringInfoString(&buf, " NOT NULL"); + /* Add GENERATED if needed */ + if (import_generated && attgenerated != NULL && + attgenerated[0] == ATTRIBUTE_GENERATED_STORED) + { + Assert(attdefault != NULL); + appendStringInfo(&buf, + " GENERATED ALWAYS AS (%s) STORED", + attdefault); } - while (++i < numrows && - strcmp(PQgetvalue(res, i, 0), tablename) == 0); - /* - * Add server name and table-level options. We specify remote - * schema and table name as options (the latter to ensure that - * renaming the foreign table doesn't break the association). - */ - appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", - quote_identifier(server->servername)); + /* Add NOT NULL if needed */ + if (import_not_null && attnotnull[0] == 't') + appendStringInfoString(&buf, " NOT NULL"); + } + while (++i < numrows && + strcmp(PQgetvalue(res, i, 0), tablename) == 0); - appendStringInfoString(&buf, "schema_name "); - deparseStringLiteral(&buf, stmt->remote_schema); - appendStringInfoString(&buf, ", table_name "); - deparseStringLiteral(&buf, tablename); + /* + * Add server name and table-level options. We specify remote schema + * and table name as options (the latter to ensure that renaming the + * foreign table doesn't break the association). + */ + appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", + quote_identifier(server->servername)); - appendStringInfoString(&buf, ");"); + appendStringInfoString(&buf, "schema_name "); + deparseStringLiteral(&buf, stmt->remote_schema); + appendStringInfoString(&buf, ", table_name "); + deparseStringLiteral(&buf, tablename); - commands = lappend(commands, pstrdup(buf.data)); - } - } - PG_FINALLY(); - { - PQclear(res); + appendStringInfoString(&buf, ");"); + + commands = lappend(commands, pstrdup(buf.data)); } - PG_END_TRY(); + PQclear(res); ReleaseConnection(conn); @@ -7424,7 +7312,7 @@ postgresForeignAsyncNotify(AsyncRequest *areq) /* On error, report the original query, not the FETCH. */ if (!PQconsumeInput(fsstate->conn)) - pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query); fetch_more_data(node); @@ -7523,7 +7411,7 @@ fetch_more_data_begin(AsyncRequest *areq) fsstate->fetch_size, fsstate->cursor_number); if (!PQsendQuery(fsstate->conn, sql)) - pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query); /* Remember that the request is in process */ fsstate->conn_state->pendingAreq = areq; diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 81358f3bde7..38e1a885941 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -15,7 +15,7 @@ #include "foreign/foreign.h" #include "lib/stringinfo.h" -#include "libpq-fe.h" +#include "libpq/libpq-be-fe.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" #include "utils/relcache.h" @@ -167,7 +167,7 @@ extern PGresult *pgfdw_get_result(PGconn *conn); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, - bool clear, const char *sql); + const char *sql); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, |