summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.git-blame-ignore-revs3
-rw-r--r--contrib/dblink/dblink.c270
-rw-r--r--contrib/postgres_fdw/connection.c232
-rw-r--r--contrib/postgres_fdw/option.c33
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c878
-rw-r--r--contrib/postgres_fdw/postgres_fdw.h4
-rw-r--r--doc/src/sgml/pgbuffercache.sgml12
-rw-r--r--src/backend/access/heap/visibilitymap.c2
-rw-r--r--src/backend/access/index/amapi.c13
-rw-r--r--src/backend/commands/foreigncmds.c1
-rw-r--r--src/backend/commands/schemacmds.c1
-rw-r--r--src/backend/executor/execParallel.c1
-rw-r--r--src/backend/libpq/auth.c12
-rw-r--r--src/backend/optimizer/plan/planner.c1
-rw-r--r--src/backend/parser/gram.y124
-rw-r--r--src/backend/postmaster/bgworker.c1
-rw-r--r--src/backend/postmaster/checkpointer.c155
-rw-r--r--src/backend/postmaster/postmaster.c7
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c31
-rw-r--r--src/backend/storage/ipc/latch.c8
-rw-r--r--src/backend/tcop/postgres.c1
-rw-r--r--src/backend/tcop/utility.c2
-rw-r--r--src/backend/utils/cache/plancache.c8
-rw-r--r--src/backend/utils/hash/dynahash.c10
-rw-r--r--src/backend/utils/mmgr/mcxt.c39
-rw-r--r--src/bin/pg_basebackup/pg_createsubscriber.c11
-rw-r--r--src/include/libpq/libpq-be-fe-helpers.h74
-rw-r--r--src/include/libpq/libpq-be-fe.h259
-rw-r--r--src/include/nodes/plannodes.h17
-rw-r--r--src/include/port/solaris.h9
-rw-r--r--src/include/utils/palloc.h2
-rw-r--r--src/include/utils/pgstat_kind.h6
-rw-r--r--src/test/modules/injection_points/injection_stats.c2
-rw-r--r--src/test/modules/injection_points/injection_stats_fixed.c2
-rw-r--r--src/test/subscription/t/035_conflicts.pl10
-rw-r--r--src/tools/pgindent/typedefs.list2
36 files changed, 1243 insertions, 1000 deletions
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index 1ee1dee0111..f8526d4d1a9 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -14,6 +14,9 @@
#
# $ git log --pretty=format:"%H # %cd%n# %s" $PGINDENTGITHASH -1 --date=iso
+73873805fb3627cb23937c750fa83ffd8f16fc6c # 2025-07-25 16:36:44 -0400
+# Run pgindent on the changes of the previous patch.
+
9e345415bcd3c4358350b89edfd710469b8bfaf9 # 2025-07-01 15:23:07 +0200
# Fix indentation in pg_numa code
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index de5bed282f3..f98805fb5f7 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -101,8 +101,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
const char *conname,
const char *sql,
bool fail);
-static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
-static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
+static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
+static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static remoteConn *createNewConnection(const char *name);
@@ -169,14 +169,6 @@ typedef struct remoteConnHashEnt
/* initial number of connection hashes */
#define NUMCONN 16
-static char *
-xpstrdup(const char *in)
-{
- if (in == NULL)
- return NULL;
- return pstrdup(in);
-}
-
pg_noreturn static void
dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
{
@@ -870,131 +862,123 @@ static void
materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ bool is_sql_cmd;
+ int ntuples;
+ int nfields;
/* prepTuplestoreResult must have been called previously */
Assert(rsinfo->returnMode == SFRM_Materialize);
- PG_TRY();
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
- TupleDesc tupdesc;
- bool is_sql_cmd;
- int ntuples;
- int nfields;
+ is_sql_cmd = true;
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
+ /*
+ * need a tuple descriptor representing one TEXT column to return the
+ * command status string as our result tuple
+ */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+ ntuples = 1;
+ nfields = 1;
+ }
+ else
+ {
+ Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
- /*
- * need a tuple descriptor representing one TEXT column to return
- * the command status string as our result tuple
- */
- tupdesc = CreateTemplateTupleDesc(1);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
- ntuples = 1;
- nfields = 1;
- }
- else
- {
- Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+ is_sql_cmd = false;
- is_sql_cmd = false;
+ /* get a tuple descriptor for our result type */
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ {
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
- {
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
- }
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+ ntuples = PQntuples(res);
+ nfields = PQnfields(res);
+ }
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
- ntuples = PQntuples(res);
- nfields = PQnfields(res);
- }
+ /*
+ * check result and tuple descriptor have the same number of columns
+ */
+ if (nfields != tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
- /*
- * check result and tuple descriptor have the same number of columns
- */
- if (nfields != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
+ if (ntuples > 0)
+ {
+ AttInMetadata *attinmeta;
+ int nestlevel = -1;
+ Tuplestorestate *tupstore;
+ MemoryContext oldcontext;
+ int row;
+ char **values;
- if (ntuples > 0)
- {
- AttInMetadata *attinmeta;
- int nestlevel = -1;
- Tuplestorestate *tupstore;
- MemoryContext oldcontext;
- int row;
- char **values;
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ /* Set GUCs to ensure we read GUC-sensitive data types correctly */
+ if (!is_sql_cmd)
+ nestlevel = applyRemoteGucs(conn);
- /* Set GUCs to ensure we read GUC-sensitive data types correctly */
- if (!is_sql_cmd)
- nestlevel = applyRemoteGucs(conn);
+ oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+ MemoryContextSwitchTo(oldcontext);
- oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
- tupstore = tuplestore_begin_heap(true, false, work_mem);
- rsinfo->setResult = tupstore;
- rsinfo->setDesc = tupdesc;
- MemoryContextSwitchTo(oldcontext);
+ values = palloc_array(char *, nfields);
- values = palloc_array(char *, nfields);
+ /* put all tuples into the tuplestore */
+ for (row = 0; row < ntuples; row++)
+ {
+ HeapTuple tuple;
- /* put all tuples into the tuplestore */
- for (row = 0; row < ntuples; row++)
+ if (!is_sql_cmd)
{
- HeapTuple tuple;
+ int i;
- if (!is_sql_cmd)
- {
- int i;
-
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, row, i))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, i);
- }
- }
- else
+ for (i = 0; i < nfields; i++)
{
- values[0] = PQcmdStatus(res);
+ if (PQgetisnull(res, row, i))
+ values[i] = NULL;
+ else
+ values[i] = PQgetvalue(res, row, i);
}
-
- /* build the tuple and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- tuplestore_puttuple(tupstore, tuple);
+ }
+ else
+ {
+ values[0] = PQcmdStatus(res);
}
- /* clean up GUC settings, if we changed any */
- restoreLocalGucs(nestlevel);
+ /* build the tuple and put it into the tuplestore. */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+ tuplestore_puttuple(tupstore, tuple);
}
+
+ /* clean up GUC settings, if we changed any */
+ restoreLocalGucs(nestlevel);
}
- PG_FINALLY();
- {
- /* be sure to release the libpq result */
- PQclear(res);
- }
- PG_END_TRY();
+
+ PQclear(res);
}
/*
@@ -1013,16 +997,17 @@ materializeQueryResult(FunctionCallInfo fcinfo,
bool fail)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
- PGresult *volatile res = NULL;
- volatile storeInfo sinfo = {0};
/* prepTuplestoreResult must have been called previously */
Assert(rsinfo->returnMode == SFRM_Materialize);
- sinfo.fcinfo = fcinfo;
-
+ /* Use a PG_TRY block to ensure we pump libpq dry of results */
PG_TRY();
{
+ storeInfo sinfo = {0};
+ PGresult *res;
+
+ sinfo.fcinfo = fcinfo;
/* Create short-lived memory context for data conversions */
sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"dblink temporary context",
@@ -1035,14 +1020,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
- /*
- * dblink_res_error will clear the passed PGresult, so we need
- * this ugly dance to avoid doing so twice during error exit
- */
- PGresult *res1 = res;
-
- res = NULL;
- dblink_res_error(conn, conname, res1, fail,
+ dblink_res_error(conn, conname, res, fail,
"while executing query");
/* if fail isn't set, we'll return an empty query result */
}
@@ -1081,7 +1059,6 @@ materializeQueryResult(FunctionCallInfo fcinfo,
tuplestore_puttuple(tupstore, tuple);
PQclear(res);
- res = NULL;
}
else
{
@@ -1090,26 +1067,20 @@ materializeQueryResult(FunctionCallInfo fcinfo,
Assert(rsinfo->setResult != NULL);
PQclear(res);
- res = NULL;
}
/* clean up data conversion short-lived memory context */
if (sinfo.tmpcontext != NULL)
MemoryContextDelete(sinfo.tmpcontext);
- sinfo.tmpcontext = NULL;
PQclear(sinfo.last_res);
- sinfo.last_res = NULL;
PQclear(sinfo.cur_res);
- sinfo.cur_res = NULL;
}
PG_CATCH();
{
- /* be sure to release any libpq result we collected */
- PQclear(res);
- PQclear(sinfo.last_res);
- PQclear(sinfo.cur_res);
- /* and clear out any pending data in libpq */
+ PGresult *res;
+
+ /* be sure to clear out any pending data in libpq */
while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
NULL)
PQclear(res);
@@ -1122,7 +1093,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
* Execute query, and send any result rows to sinfo->tuplestore.
*/
static PGresult *
-storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
+storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
{
bool first = true;
int nestlevel = -1;
@@ -1190,7 +1161,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
* (in this case the PGresult might contain either zero or one row).
*/
static void
-storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
+storeRow(storeInfo *sinfo, PGresult *res, bool first)
{
int nfields = PQnfields(res);
HeapTuple tuple;
@@ -2795,10 +2766,13 @@ dblink_connstr_check(const char *connstr)
/*
* Report an error received from the remote server
*
- * res: the received error result (will be freed)
+ * res: the received error result
* fail: true for ERROR ereport, false for NOTICE
* fmt and following args: sprintf-style format and values for errcontext;
* the resulting string should be worded like "while <some action>"
+ *
+ * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
+ * in which case memory context cleanup will clear it eventually).
*/
static void
dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
@@ -2806,15 +2780,11 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
{
int level;
char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
- char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
- char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
- char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+ char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+ char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+ char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+ char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
int sqlstate;
- char *message_primary;
- char *message_detail;
- char *message_hint;
- char *message_context;
va_list ap;
char dblink_context_msg[512];
@@ -2832,11 +2802,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
else
sqlstate = ERRCODE_CONNECTION_FAILURE;
- message_primary = xpstrdup(pg_diag_message_primary);
- message_detail = xpstrdup(pg_diag_message_detail);
- message_hint = xpstrdup(pg_diag_message_hint);
- message_context = xpstrdup(pg_diag_context);
-
/*
* If we don't get a message from the PGresult, try the PGconn. This is
* needed because for connection-level failures, PQgetResult may just
@@ -2846,14 +2811,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
message_primary = pchomp(PQerrorMessage(conn));
/*
- * Now that we've copied all the data we need out of the PGresult, it's
- * safe to free it. We must do this to avoid PGresult leakage. We're
- * leaking all the strings too, but those are in palloc'd memory that will
- * get cleaned up eventually.
- */
- PQclear(res);
-
- /*
* Format the basic errcontext string. Below, we'll add on something
* about the connection name. That's a violation of the translatability
* guidelines about constructing error messages out of parts, but since
@@ -2877,6 +2834,7 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
dblink_context_msg, conname)) :
(errcontext("%s on unnamed dblink connection",
dblink_context_msg))));
+ PQclear(res);
}
/*
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index c1ce6f33436..a33843fcf85 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -815,7 +815,7 @@ static void
do_sql_command_begin(PGconn *conn, const char *sql)
{
if (!PQsendQuery(conn, sql))
- pgfdw_report_error(ERROR, NULL, conn, false, sql);
+ pgfdw_report_error(ERROR, NULL, conn, sql);
}
static void
@@ -830,10 +830,10 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
* would be large compared to the overhead of PQconsumeInput.)
*/
if (consume_input && !PQconsumeInput(conn))
- pgfdw_report_error(ERROR, NULL, conn, false, sql);
+ pgfdw_report_error(ERROR, NULL, conn, sql);
res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, sql);
+ pgfdw_report_error(ERROR, res, conn, sql);
PQclear(res);
}
@@ -967,62 +967,55 @@ pgfdw_get_result(PGconn *conn)
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)
- * res: PGresult containing the error
+ * res: PGresult containing the error (might be NULL)
* conn: connection we did the query on
- * clear: if true, PQclear the result (otherwise caller will handle it)
* sql: NULL, or text of remote command we tried to execute
*
+ * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
+ * in which case memory context cleanup will clear it eventually).
+ *
* Note: callers that choose not to throw ERROR for a remote error are
* responsible for making sure that the associated ConnCacheEntry gets
* marked with have_error = true.
*/
void
pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
- bool clear, const char *sql)
+ const char *sql)
{
- /* If requested, PGresult must be released before leaving this function. */
- PG_TRY();
- {
- char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
- char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
- char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
- char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
- int sqlstate;
-
- if (diag_sqlstate)
- sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
- diag_sqlstate[1],
- diag_sqlstate[2],
- diag_sqlstate[3],
- diag_sqlstate[4]);
- else
- sqlstate = ERRCODE_CONNECTION_FAILURE;
+ char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+ char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
+ char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
+ char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
+ char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
+ int sqlstate;
+
+ if (diag_sqlstate)
+ sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
+ diag_sqlstate[1],
+ diag_sqlstate[2],
+ diag_sqlstate[3],
+ diag_sqlstate[4]);
+ else
+ sqlstate = ERRCODE_CONNECTION_FAILURE;
- /*
- * If we don't get a message from the PGresult, try the PGconn. This
- * is needed because for connection-level failures, PQgetResult may
- * just return NULL, not a PGresult at all.
- */
- if (message_primary == NULL)
- message_primary = pchomp(PQerrorMessage(conn));
-
- ereport(elevel,
- (errcode(sqlstate),
- (message_primary != NULL && message_primary[0] != '\0') ?
- errmsg_internal("%s", message_primary) :
- errmsg("could not obtain message string for remote error"),
- message_detail ? errdetail_internal("%s", message_detail) : 0,
- message_hint ? errhint("%s", message_hint) : 0,
- message_context ? errcontext("%s", message_context) : 0,
- sql ? errcontext("remote SQL command: %s", sql) : 0));
- }
- PG_FINALLY();
- {
- if (clear)
- PQclear(res);
- }
- PG_END_TRY();
+ /*
+ * If we don't get a message from the PGresult, try the PGconn. This is
+ * needed because for connection-level failures, PQgetResult may just
+ * return NULL, not a PGresult at all.
+ */
+ if (message_primary == NULL)
+ message_primary = pchomp(PQerrorMessage(conn));
+
+ ereport(elevel,
+ (errcode(sqlstate),
+ (message_primary != NULL && message_primary[0] != '\0') ?
+ errmsg_internal("%s", message_primary) :
+ errmsg("could not obtain message string for remote error"),
+ message_detail ? errdetail_internal("%s", message_detail) : 0,
+ message_hint ? errhint("%s", message_hint) : 0,
+ message_context ? errcontext("%s", message_context) : 0,
+ sql ? errcontext("remote SQL command: %s", sql) : 0));
+ PQclear(res);
}
/*
@@ -1545,7 +1538,7 @@ pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
*/
if (!PQsendQuery(conn, query))
{
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report_error(WARNING, NULL, conn, query);
return false;
}
@@ -1570,7 +1563,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
*/
if (consume_input && !PQconsumeInput(conn))
{
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report_error(WARNING, NULL, conn, query);
return false;
}
@@ -1582,7 +1575,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
(errmsg("could not get query result due to timeout"),
errcontext("remote SQL command: %s", query)));
else
- pgfdw_report_error(WARNING, NULL, conn, false, query);
+ pgfdw_report_error(WARNING, NULL, conn, query);
return false;
}
@@ -1590,7 +1583,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
/* Issue a warning if not successful. */
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
- pgfdw_report_error(WARNING, result, conn, true, query);
+ pgfdw_report_error(WARNING, result, conn, query);
return ignore_errors;
}
PQclear(result);
@@ -1618,103 +1611,90 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result,
bool *timed_out)
{
- volatile bool failed = false;
- PGresult *volatile last_res = NULL;
+ bool failed = false;
+ PGresult *last_res = NULL;
+ int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
*result = NULL;
*timed_out = false;
-
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
+ for (;;)
{
- int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
+ PGresult *res;
- for (;;)
+ while (PQisBusy(conn))
{
- PGresult *res;
+ int wc;
+ TimestampTz now = GetCurrentTimestamp();
+ long cur_timeout;
- while (PQisBusy(conn))
+ /* If timeout has expired, give up. */
+ if (now >= endtime)
{
- int wc;
- TimestampTz now = GetCurrentTimestamp();
- long cur_timeout;
-
- /* If timeout has expired, give up. */
- if (now >= endtime)
- {
- *timed_out = true;
- failed = true;
- goto exit;
- }
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
- /* If we need to re-issue the cancel request, do that. */
- if (now >= retrycanceltime)
- {
- /* We ignore failure to issue the repeated request. */
- (void) libpqsrv_cancel(conn, endtime);
+ /* If we need to re-issue the cancel request, do that. */
+ if (now >= retrycanceltime)
+ {
+ /* We ignore failure to issue the repeated request. */
+ (void) libpqsrv_cancel(conn, endtime);
- /* Recompute "now" in case that took measurable time. */
- now = GetCurrentTimestamp();
+ /* Recompute "now" in case that took measurable time. */
+ now = GetCurrentTimestamp();
- /* Adjust re-cancel timeout in increasing steps. */
- retrycanceltime = TimestampTzPlusMilliseconds(now,
- canceldelta);
- canceldelta += canceldelta;
- }
+ /* Adjust re-cancel timeout in increasing steps. */
+ retrycanceltime = TimestampTzPlusMilliseconds(now,
+ canceldelta);
+ canceldelta += canceldelta;
+ }
- /* If timeout has expired, give up, else get sleep time. */
- cur_timeout = TimestampDifferenceMilliseconds(now,
- Min(endtime,
- retrycanceltime));
- if (cur_timeout <= 0)
- {
- *timed_out = true;
- failed = true;
- goto exit;
- }
+ /* If timeout has expired, give up, else get sleep time. */
+ cur_timeout = TimestampDifferenceMilliseconds(now,
+ Min(endtime,
+ retrycanceltime));
+ if (cur_timeout <= 0)
+ {
+ *timed_out = true;
+ failed = true;
+ goto exit;
+ }
- /* first time, allocate or get the custom wait event */
- if (pgfdw_we_cleanup_result == 0)
- pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
+ /* first time, allocate or get the custom wait event */
+ if (pgfdw_we_cleanup_result == 0)
+ pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
- /* Sleep until there's something to do */
- wc = WaitLatchOrSocket(MyLatch,
- WL_LATCH_SET | WL_SOCKET_READABLE |
- WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- PQsocket(conn),
- cur_timeout, pgfdw_we_cleanup_result);
- ResetLatch(MyLatch);
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ PQsocket(conn),
+ cur_timeout, pgfdw_we_cleanup_result);
+ ResetLatch(MyLatch);
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- /* Data available in socket? */
- if (wc & WL_SOCKET_READABLE)
+ /* Data available in socket? */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
{
- if (!PQconsumeInput(conn))
- {
- /* connection trouble */
- failed = true;
- goto exit;
- }
+ /* connection trouble */
+ failed = true;
+ goto exit;
}
}
+ }
- res = PQgetResult(conn);
- if (res == NULL)
- break; /* query is complete */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ break; /* query is complete */
- PQclear(last_res);
- last_res = res;
- }
-exit: ;
- }
- PG_CATCH();
- {
PQclear(last_res);
- PG_RE_THROW();
+ last_res = res;
}
- PG_END_TRY();
-
+exit:
if (failed)
PQclear(last_res);
else
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index c2f936640bc..d6fa89bad93 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -21,6 +21,7 @@
#include "libpq/libpq-be.h"
#include "postgres_fdw.h"
#include "utils/guc.h"
+#include "utils/memutils.h"
#include "utils/varlena.h"
/*
@@ -40,12 +41,6 @@ typedef struct PgFdwOption
static PgFdwOption *postgres_fdw_options;
/*
- * Valid options for libpq.
- * Allocated and filled in InitPgFdwOptions.
- */
-static PQconninfoOption *libpq_options;
-
-/*
* GUC parameters
*/
char *pgfdw_application_name = NULL;
@@ -239,6 +234,7 @@ static void
InitPgFdwOptions(void)
{
int num_libpq_opts;
+ PQconninfoOption *libpq_options;
PQconninfoOption *lopt;
PgFdwOption *popt;
@@ -307,8 +303,8 @@ InitPgFdwOptions(void)
* Get list of valid libpq options.
*
* To avoid unnecessary work, we get the list once and use it throughout
- * the lifetime of this backend process. We don't need to care about
- * memory context issues, because PQconndefaults allocates with malloc.
+ * the lifetime of this backend process. Hence, we'll allocate it in
+ * TopMemoryContext.
*/
libpq_options = PQconndefaults();
if (!libpq_options) /* assume reason for failure is OOM */
@@ -325,19 +321,11 @@ InitPgFdwOptions(void)
/*
* Construct an array which consists of all valid options for
* postgres_fdw, by appending FDW-specific options to libpq options.
- *
- * We use plain malloc here to allocate postgres_fdw_options because it
- * lives as long as the backend process does. Besides, keeping
- * libpq_options in memory allows us to avoid copying every keyword
- * string.
*/
postgres_fdw_options = (PgFdwOption *)
- malloc(sizeof(PgFdwOption) * num_libpq_opts +
- sizeof(non_libpq_options));
- if (postgres_fdw_options == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
- errmsg("out of memory")));
+ MemoryContextAlloc(TopMemoryContext,
+ sizeof(PgFdwOption) * num_libpq_opts +
+ sizeof(non_libpq_options));
popt = postgres_fdw_options;
for (lopt = libpq_options; lopt->keyword; lopt++)
@@ -355,8 +343,8 @@ InitPgFdwOptions(void)
if (strncmp(lopt->keyword, "oauth_", strlen("oauth_")) == 0)
continue;
- /* We don't have to copy keyword string, as described above. */
- popt->keyword = lopt->keyword;
+ popt->keyword = MemoryContextStrdup(TopMemoryContext,
+ lopt->keyword);
/*
* "user" and any secret options are allowed only on user mappings.
@@ -371,6 +359,9 @@ InitPgFdwOptions(void)
popt++;
}
+ /* Done with libpq's output structure. */
+ PQconninfoFree(libpq_options);
+
/* Append FDW-specific options and dummy terminator. */
memcpy(popt, non_libpq_options, sizeof(non_libpq_options));
}
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index e0a34b27c7c..25b287be069 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -240,7 +240,6 @@ typedef struct PgFdwDirectModifyState
PGresult *result; /* result for query */
int num_tuples; /* # of result tuples */
int next_tuple; /* index of next one to return */
- MemoryContextCallback result_cb; /* ensures result will get freed */
Relation resultRel; /* relcache entry for the target relation */
AttrNumber *attnoMap; /* array of attnums of input user columns */
AttrNumber ctidAttno; /* attnum of input ctid column */
@@ -1703,13 +1702,9 @@ postgresReScanForeignScan(ForeignScanState *node)
return;
}
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fsstate->conn, sql);
PQclear(res);
/* Now force a fresh FETCH. */
@@ -2672,17 +2667,6 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
node->fdw_state = dmstate;
/*
- * We use a memory context callback to ensure that the dmstate's PGresult
- * (if any) will be released, even if the query fails somewhere that's
- * outside our control. The callback is always armed for the duration of
- * the query; this relies on PQclear(NULL) being a no-op.
- */
- dmstate->result_cb.func = (MemoryContextCallbackFunction) PQclear;
- dmstate->result_cb.arg = NULL;
- MemoryContextRegisterResetCallback(CurrentMemoryContext,
- &dmstate->result_cb);
-
- /*
* Identify which user to do the remote access as. This should match what
* ExecCheckPermissions() does.
*/
@@ -2829,13 +2813,7 @@ postgresEndDirectModify(ForeignScanState *node)
return;
/* Release PGresult */
- if (dmstate->result)
- {
- PQclear(dmstate->result);
- dmstate->result = NULL;
- /* ... and don't forget to disable the callback */
- dmstate->result_cb.arg = NULL;
- }
+ PQclear(dmstate->result);
/* Release remote connection */
ReleaseConnection(dmstate->conn);
@@ -3626,41 +3604,32 @@ get_remote_estimate(const char *sql, PGconn *conn,
double *rows, int *width,
Cost *startup_cost, Cost *total_cost)
{
- PGresult *volatile res = NULL;
-
- /* PGresult must be released before leaving this function. */
- PG_TRY();
- {
- char *line;
- char *p;
- int n;
+ PGresult *res;
+ char *line;
+ char *p;
+ int n;
- /*
- * Execute EXPLAIN remotely.
- */
- res = pgfdw_exec_query(conn, sql, NULL);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, sql);
+ /*
+ * Execute EXPLAIN remotely.
+ */
+ res = pgfdw_exec_query(conn, sql, NULL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, sql);
- /*
- * Extract cost numbers for topmost plan node. Note we search for a
- * left paren from the end of the line to avoid being confused by
- * other uses of parentheses.
- */
- line = PQgetvalue(res, 0, 0);
- p = strrchr(line, '(');
- if (p == NULL)
- elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
- n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
- startup_cost, total_cost, rows, width);
- if (n != 4)
- elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
- }
- PG_FINALLY();
- {
- PQclear(res);
- }
- PG_END_TRY();
+ /*
+ * Extract cost numbers for topmost plan node. Note we search for a left
+ * paren from the end of the line to avoid being confused by other uses of
+ * parentheses.
+ */
+ line = PQgetvalue(res, 0, 0);
+ p = strrchr(line, '(');
+ if (p == NULL)
+ elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
+ n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
+ startup_cost, total_cost, rows, width);
+ if (n != 4)
+ elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
+ PQclear(res);
}
/*
@@ -3800,17 +3769,14 @@ create_cursor(ForeignScanState *node)
*/
if (!PQsendQueryParams(conn, buf.data, numParams,
NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+ pgfdw_report_error(ERROR, NULL, conn, buf.data);
/*
* Get the result, and check for success.
- *
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
*/
res = pgfdw_get_result(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
+ pgfdw_report_error(ERROR, res, conn, fsstate->query);
PQclear(res);
/* Mark the cursor as created, and show no tuples have been retrieved */
@@ -3832,7 +3798,10 @@ static void
fetch_more_data(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
- PGresult *volatile res = NULL;
+ PGconn *conn = fsstate->conn;
+ PGresult *res;
+ int numrows;
+ int i;
MemoryContext oldcontext;
/*
@@ -3843,74 +3812,63 @@ fetch_more_data(ForeignScanState *node)
MemoryContextReset(fsstate->batch_cxt);
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
- /* PGresult must be released before leaving this function. */
- PG_TRY();
+ if (fsstate->async_capable)
{
- PGconn *conn = fsstate->conn;
- int numrows;
- int i;
+ Assert(fsstate->conn_state->pendingAreq);
- if (fsstate->async_capable)
- {
- Assert(fsstate->conn_state->pendingAreq);
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = pgfdw_get_result(conn);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, fsstate->query);
- /*
- * The query was already sent by an earlier call to
- * fetch_more_data_begin. So now we just fetch the result.
- */
- res = pgfdw_get_result(conn);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /* Reset per-connection state */
+ fsstate->conn_state->pendingAreq = NULL;
+ }
+ else
+ {
+ char sql[64];
- /* Reset per-connection state */
- fsstate->conn_state->pendingAreq = NULL;
- }
- else
- {
- char sql[64];
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
- /* This is a regular synchronous fetch. */
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, fsstate->query);
+ }
- res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
- }
+ /* Convert the data into HeapTuples */
+ numrows = PQntuples(res);
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
- /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ for (i = 0; i < numrows; i++)
+ {
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
- for (i = 0; i < numrows; i++)
- {
- Assert(IsA(node->ss.ps.plan, ForeignScan));
-
- fsstate->tuples[i] =
- make_tuple_from_result_row(res, i,
- fsstate->rel,
- fsstate->attinmeta,
- fsstate->retrieved_attrs,
- node,
- fsstate->temp_cxt);
- }
+ fsstate->tuples[i] =
+ make_tuple_from_result_row(res, i,
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->retrieved_attrs,
+ node,
+ fsstate->temp_cxt);
+ }
- /* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
- fsstate->fetch_ct_2++;
+ /* Update fetch_ct_2 */
+ if (fsstate->fetch_ct_2 < 2)
+ fsstate->fetch_ct_2++;
- /* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
- }
- PG_FINALLY();
- {
- PQclear(res);
- }
- PG_END_TRY();
+ /* Must be EOF if we didn't get as many tuples as we asked for. */
+ fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+ PQclear(res);
MemoryContextSwitchTo(oldcontext);
}
@@ -3984,14 +3942,9 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
PGresult *res;
snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
-
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
res = pgfdw_exec_query(conn, sql, conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, sql);
+ pgfdw_report_error(ERROR, res, conn, sql);
PQclear(res);
}
@@ -4199,18 +4152,15 @@ execute_foreign_modify(EState *estate,
NULL,
NULL,
0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query);
/*
* Get the result, and check for success.
- *
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
@@ -4269,17 +4219,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
fmstate->query,
0,
NULL))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query);
/*
* Get the result, and check for success.
- *
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
*/
res = pgfdw_get_result(fmstate->conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
@@ -4370,37 +4317,25 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
/*
* store_returning_result
* Store the result of a RETURNING clause
- *
- * On error, be sure to release the PGresult on the way out. Callers do not
- * have PG_TRY blocks to ensure this happens.
*/
static void
store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res)
{
- PG_TRY();
- {
- HeapTuple newtup;
+ HeapTuple newtup;
- newtup = make_tuple_from_result_row(res, 0,
- fmstate->rel,
- fmstate->attinmeta,
- fmstate->retrieved_attrs,
- NULL,
- fmstate->temp_cxt);
+ newtup = make_tuple_from_result_row(res, 0,
+ fmstate->rel,
+ fmstate->attinmeta,
+ fmstate->retrieved_attrs,
+ NULL,
+ fmstate->temp_cxt);
- /*
- * The returning slot will not necessarily be suitable to store
- * heaptuples directly, so allow for conversion.
- */
- ExecForceStoreHeapTuple(newtup, slot, true);
- }
- PG_CATCH();
- {
- PQclear(res);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ /*
+ * The returning slot will not necessarily be suitable to store heaptuples
+ * directly, so allow for conversion.
+ */
+ ExecForceStoreHeapTuple(newtup, slot, true);
}
/*
@@ -4436,14 +4371,9 @@ deallocate_query(PgFdwModifyState *fmstate)
return;
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fmstate->conn, sql);
PQclear(res);
pfree(fmstate->p_name);
fmstate->p_name = NULL;
@@ -4611,24 +4541,24 @@ execute_dml_stmt(ForeignScanState *node)
*/
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, dmstate->query);
/*
* Get the result, and check for success.
- *
- * We use a memory context callback to ensure that the PGresult will be
- * released, even if the query fails somewhere that's outside our control.
- * The callback is already registered, just need to fill in its arg.
*/
- Assert(dmstate->result == NULL);
dmstate->result = pgfdw_get_result(dmstate->conn);
- dmstate->result_cb.arg = dmstate->result;
-
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, false,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->conn,
dmstate->query);
+ /*
+ * The result potentially needs to survive across multiple executor row
+ * cycles, so move it to the context where the dmstate is.
+ */
+ dmstate->result = libpqsrv_PGresultSetParent(dmstate->result,
+ GetMemoryChunkContext(dmstate));
+
/* Get the number of rows affected. */
if (dmstate->has_returning)
dmstate->num_tuples = PQntuples(dmstate->result);
@@ -4965,7 +4895,7 @@ postgresAnalyzeForeignTable(Relation relation,
UserMapping *user;
PGconn *conn;
StringInfoData sql;
- PGresult *volatile res = NULL;
+ PGresult *res;
/* Return the row-analysis function pointer */
*func = postgresAcquireSampleRowsFunc;
@@ -4991,22 +4921,14 @@ postgresAnalyzeForeignTable(Relation relation,
initStringInfo(&sql);
deparseAnalyzeSizeSql(&sql, relation);
- /* In what follows, do not risk leaking any PGresults. */
- PG_TRY();
- {
- res = pgfdw_exec_query(conn, sql.data, NULL);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, sql.data);
- if (PQntuples(res) != 1 || PQnfields(res) != 1)
- elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
- *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
- }
- PG_FINALLY();
- {
- PQclear(res);
- }
- PG_END_TRY();
+ if (PQntuples(res) != 1 || PQnfields(res) != 1)
+ elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
+ *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
+ PQclear(res);
ReleaseConnection(conn);
@@ -5027,9 +4949,9 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
UserMapping *user;
PGconn *conn;
StringInfoData sql;
- PGresult *volatile res = NULL;
- volatile double reltuples = -1;
- volatile char relkind = 0;
+ PGresult *res;
+ double reltuples;
+ char relkind;
/* assume the remote relation does not support TABLESAMPLE */
*can_tablesample = false;
@@ -5048,24 +4970,15 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
initStringInfo(&sql);
deparseAnalyzeInfoSql(&sql, relation);
- /* In what follows, do not risk leaking any PGresults. */
- PG_TRY();
- {
- res = pgfdw_exec_query(conn, sql.data, NULL);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, sql.data);
- if (PQntuples(res) != 1 || PQnfields(res) != 2)
- elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
- reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
- relkind = *(PQgetvalue(res, 0, 1));
- }
- PG_FINALLY();
- {
- if (res)
- PQclear(res);
- }
- PG_END_TRY();
+ if (PQntuples(res) != 1 || PQnfields(res) != 2)
+ elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
+ reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
+ relkind = *(PQgetvalue(res, 0, 1));
+ PQclear(res);
ReleaseConnection(conn);
@@ -5108,7 +5021,9 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double reltuples;
unsigned int cursor_number;
StringInfoData sql;
- PGresult *volatile res = NULL;
+ PGresult *res;
+ char fetch_sql[64];
+ int fetch_size;
ListCell *lc;
/* Initialize workspace state */
@@ -5285,91 +5200,76 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
- /* In what follows, do not risk leaking any PGresults. */
- PG_TRY();
- {
- char fetch_sql[64];
- int fetch_size;
-
- res = pgfdw_exec_query(conn, sql.data, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, false, sql.data);
- PQclear(res);
- res = NULL;
+ res = pgfdw_exec_query(conn, sql.data, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, conn, sql.data);
+ PQclear(res);
- /*
- * Determine the fetch size. The default is arbitrary, but shouldn't
- * be enormous.
- */
- fetch_size = 100;
- foreach(lc, server->options)
- {
- DefElem *def = (DefElem *) lfirst(lc);
+ /*
+ * Determine the fetch size. The default is arbitrary, but shouldn't be
+ * enormous.
+ */
+ fetch_size = 100;
+ foreach(lc, server->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
- if (strcmp(def->defname, "fetch_size") == 0)
- {
- (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
- break;
- }
- }
- foreach(lc, table->options)
+ if (strcmp(def->defname, "fetch_size") == 0)
{
- DefElem *def = (DefElem *) lfirst(lc);
-
- if (strcmp(def->defname, "fetch_size") == 0)
- {
- (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
- break;
- }
+ (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
+ break;
}
+ }
+ foreach(lc, table->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
- /* Construct command to fetch rows from remote. */
- snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
- fetch_size, cursor_number);
-
- /* Retrieve and process rows a batch at a time. */
- for (;;)
+ if (strcmp(def->defname, "fetch_size") == 0)
{
- int numrows;
- int i;
+ (void) parse_int(defGetString(def), &fetch_size, 0, NULL);
+ break;
+ }
+ }
- /* Allow users to cancel long query */
- CHECK_FOR_INTERRUPTS();
+ /* Construct command to fetch rows from remote. */
+ snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
+ fetch_size, cursor_number);
- /*
- * XXX possible future improvement: if rowstoskip is large, we
- * could issue a MOVE rather than physically fetching the rows,
- * then just adjust rowstoskip and samplerows appropriately.
- */
+ /* Retrieve and process rows a batch at a time. */
+ for (;;)
+ {
+ int numrows;
+ int i;
- /* Fetch some rows */
- res = pgfdw_exec_query(conn, fetch_sql, NULL);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, sql.data);
+ /* Allow users to cancel long query */
+ CHECK_FOR_INTERRUPTS();
- /* Process whatever we got. */
- numrows = PQntuples(res);
- for (i = 0; i < numrows; i++)
- analyze_row_processor(res, i, &astate);
+ /*
+ * XXX possible future improvement: if rowstoskip is large, we could
+ * issue a MOVE rather than physically fetching the rows, then just
+ * adjust rowstoskip and samplerows appropriately.
+ */
- PQclear(res);
- res = NULL;
+ /* Fetch some rows */
+ res = pgfdw_exec_query(conn, fetch_sql, NULL);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, sql.data);
- /* Must be EOF if we didn't get all the rows requested. */
- if (numrows < fetch_size)
- break;
- }
+ /* Process whatever we got. */
+ numrows = PQntuples(res);
+ for (i = 0; i < numrows; i++)
+ analyze_row_processor(res, i, &astate);
- /* Close the cursor, just to be tidy. */
- close_cursor(conn, cursor_number, NULL);
- }
- PG_CATCH();
- {
PQclear(res);
- PG_RE_THROW();
+
+ /* Must be EOF if we didn't get all the rows requested. */
+ if (numrows < fetch_size)
+ break;
}
- PG_END_TRY();
+
+ /* Close the cursor, just to be tidy. */
+ close_cursor(conn, cursor_number, NULL);
ReleaseConnection(conn);
@@ -5481,7 +5381,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
UserMapping *mapping;
PGconn *conn;
StringInfoData buf;
- PGresult *volatile res = NULL;
+ PGresult *res;
int numrows,
i;
ListCell *lc;
@@ -5520,243 +5420,231 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
/* Create workspace for strings */
initStringInfo(&buf);
- /* In what follows, do not risk leaking any PGresults. */
- PG_TRY();
- {
- /* Check that the schema really exists */
- appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
- deparseStringLiteral(&buf, stmt->remote_schema);
+ /* Check that the schema really exists */
+ appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
- res = pgfdw_exec_query(conn, buf.data, NULL);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, buf.data);
- if (PQntuples(res) != 1)
- ereport(ERROR,
- (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
- errmsg("schema \"%s\" is not present on foreign server \"%s\"",
- stmt->remote_schema, server->servername)));
+ if (PQntuples(res) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
+ errmsg("schema \"%s\" is not present on foreign server \"%s\"",
+ stmt->remote_schema, server->servername)));
- PQclear(res);
- res = NULL;
- resetStringInfo(&buf);
+ PQclear(res);
+ resetStringInfo(&buf);
- /*
- * Fetch all table data from this schema, possibly restricted by
- * EXCEPT or LIMIT TO. (We don't actually need to pay any attention
- * to EXCEPT/LIMIT TO here, because the core code will filter the
- * statements we return according to those lists anyway. But it
- * should save a few cycles to not process excluded tables in the
- * first place.)
- *
- * Import table data for partitions only when they are explicitly
- * specified in LIMIT TO clause. Otherwise ignore them and only
- * include the definitions of the root partitioned tables to allow
- * access to the complete remote data set locally in the schema
- * imported.
- *
- * Note: because we run the connection with search_path restricted to
- * pg_catalog, the format_type() and pg_get_expr() outputs will always
- * include a schema name for types/functions in other schemas, which
- * is what we want.
- */
+ /*
+ * Fetch all table data from this schema, possibly restricted by EXCEPT or
+ * LIMIT TO. (We don't actually need to pay any attention to EXCEPT/LIMIT
+ * TO here, because the core code will filter the statements we return
+ * according to those lists anyway. But it should save a few cycles to
+ * not process excluded tables in the first place.)
+ *
+ * Import table data for partitions only when they are explicitly
+ * specified in LIMIT TO clause. Otherwise ignore them and only include
+ * the definitions of the root partitioned tables to allow access to the
+ * complete remote data set locally in the schema imported.
+ *
+ * Note: because we run the connection with search_path restricted to
+ * pg_catalog, the format_type() and pg_get_expr() outputs will always
+ * include a schema name for types/functions in other schemas, which is
+ * what we want.
+ */
+ appendStringInfoString(&buf,
+ "SELECT relname, "
+ " attname, "
+ " format_type(atttypid, atttypmod), "
+ " attnotnull, "
+ " pg_get_expr(adbin, adrelid), ");
+
+ /* Generated columns are supported since Postgres 12 */
+ if (PQserverVersion(conn) >= 120000)
appendStringInfoString(&buf,
- "SELECT relname, "
- " attname, "
- " format_type(atttypid, atttypmod), "
- " attnotnull, "
- " pg_get_expr(adbin, adrelid), ");
-
- /* Generated columns are supported since Postgres 12 */
- if (PQserverVersion(conn) >= 120000)
- appendStringInfoString(&buf,
- " attgenerated, ");
- else
- appendStringInfoString(&buf,
- " NULL, ");
-
- if (import_collate)
- appendStringInfoString(&buf,
- " collname, "
- " collnsp.nspname ");
- else
- appendStringInfoString(&buf,
- " NULL, NULL ");
-
+ " attgenerated, ");
+ else
appendStringInfoString(&buf,
- "FROM pg_class c "
- " JOIN pg_namespace n ON "
- " relnamespace = n.oid "
- " LEFT JOIN pg_attribute a ON "
- " attrelid = c.oid AND attnum > 0 "
- " AND NOT attisdropped "
- " LEFT JOIN pg_attrdef ad ON "
- " adrelid = c.oid AND adnum = attnum ");
-
- if (import_collate)
- appendStringInfoString(&buf,
- " LEFT JOIN pg_collation coll ON "
- " coll.oid = attcollation "
- " LEFT JOIN pg_namespace collnsp ON "
- " collnsp.oid = collnamespace ");
+ " NULL, ");
+ if (import_collate)
appendStringInfoString(&buf,
- "WHERE c.relkind IN ("
- CppAsString2(RELKIND_RELATION) ","
- CppAsString2(RELKIND_VIEW) ","
- CppAsString2(RELKIND_FOREIGN_TABLE) ","
- CppAsString2(RELKIND_MATVIEW) ","
- CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
- " AND n.nspname = ");
- deparseStringLiteral(&buf, stmt->remote_schema);
-
- /* Partitions are supported since Postgres 10 */
- if (PQserverVersion(conn) >= 100000 &&
- stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
- appendStringInfoString(&buf, " AND NOT c.relispartition ");
-
- /* Apply restrictions for LIMIT TO and EXCEPT */
- if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
- stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
+ " collname, "
+ " collnsp.nspname ");
+ else
+ appendStringInfoString(&buf,
+ " NULL, NULL ");
+
+ appendStringInfoString(&buf,
+ "FROM pg_class c "
+ " JOIN pg_namespace n ON "
+ " relnamespace = n.oid "
+ " LEFT JOIN pg_attribute a ON "
+ " attrelid = c.oid AND attnum > 0 "
+ " AND NOT attisdropped "
+ " LEFT JOIN pg_attrdef ad ON "
+ " adrelid = c.oid AND adnum = attnum ");
+
+ if (import_collate)
+ appendStringInfoString(&buf,
+ " LEFT JOIN pg_collation coll ON "
+ " coll.oid = attcollation "
+ " LEFT JOIN pg_namespace collnsp ON "
+ " collnsp.oid = collnamespace ");
+
+ appendStringInfoString(&buf,
+ "WHERE c.relkind IN ("
+ CppAsString2(RELKIND_RELATION) ","
+ CppAsString2(RELKIND_VIEW) ","
+ CppAsString2(RELKIND_FOREIGN_TABLE) ","
+ CppAsString2(RELKIND_MATVIEW) ","
+ CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
+ " AND n.nspname = ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
+
+ /* Partitions are supported since Postgres 10 */
+ if (PQserverVersion(conn) >= 100000 &&
+ stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
+ appendStringInfoString(&buf, " AND NOT c.relispartition ");
+
+ /* Apply restrictions for LIMIT TO and EXCEPT */
+ if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
+ stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
+ {
+ bool first_item = true;
+
+ appendStringInfoString(&buf, " AND c.relname ");
+ if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
+ appendStringInfoString(&buf, "NOT ");
+ appendStringInfoString(&buf, "IN (");
+
+ /* Append list of table names within IN clause */
+ foreach(lc, stmt->table_list)
{
- bool first_item = true;
+ RangeVar *rv = (RangeVar *) lfirst(lc);
- appendStringInfoString(&buf, " AND c.relname ");
- if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
- appendStringInfoString(&buf, "NOT ");
- appendStringInfoString(&buf, "IN (");
+ if (first_item)
+ first_item = false;
+ else
+ appendStringInfoString(&buf, ", ");
+ deparseStringLiteral(&buf, rv->relname);
+ }
+ appendStringInfoChar(&buf, ')');
+ }
- /* Append list of table names within IN clause */
- foreach(lc, stmt->table_list)
- {
- RangeVar *rv = (RangeVar *) lfirst(lc);
+ /* Append ORDER BY at the end of query to ensure output ordering */
+ appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
- if (first_item)
- first_item = false;
- else
- appendStringInfoString(&buf, ", ");
- deparseStringLiteral(&buf, rv->relname);
- }
- appendStringInfoChar(&buf, ')');
- }
+ /* Fetch the data */
+ res = pgfdw_exec_query(conn, buf.data, NULL);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, buf.data);
- /* Append ORDER BY at the end of query to ensure output ordering */
- appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
+ /* Process results */
+ numrows = PQntuples(res);
+ /* note: incrementation of i happens in inner loop's while() test */
+ for (i = 0; i < numrows;)
+ {
+ char *tablename = PQgetvalue(res, i, 0);
+ bool first_item = true;
- /* Fetch the data */
- res = pgfdw_exec_query(conn, buf.data, NULL);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, buf.data);
+ resetStringInfo(&buf);
+ appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
+ quote_identifier(tablename));
- /* Process results */
- numrows = PQntuples(res);
- /* note: incrementation of i happens in inner loop's while() test */
- for (i = 0; i < numrows;)
+ /* Scan all rows for this table */
+ do
{
- char *tablename = PQgetvalue(res, i, 0);
- bool first_item = true;
+ char *attname;
+ char *typename;
+ char *attnotnull;
+ char *attgenerated;
+ char *attdefault;
+ char *collname;
+ char *collnamespace;
+
+ /* If table has no columns, we'll see nulls here */
+ if (PQgetisnull(res, i, 1))
+ continue;
- resetStringInfo(&buf);
- appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
- quote_identifier(tablename));
+ attname = PQgetvalue(res, i, 1);
+ typename = PQgetvalue(res, i, 2);
+ attnotnull = PQgetvalue(res, i, 3);
+ attdefault = PQgetisnull(res, i, 4) ? NULL :
+ PQgetvalue(res, i, 4);
+ attgenerated = PQgetisnull(res, i, 5) ? NULL :
+ PQgetvalue(res, i, 5);
+ collname = PQgetisnull(res, i, 6) ? NULL :
+ PQgetvalue(res, i, 6);
+ collnamespace = PQgetisnull(res, i, 7) ? NULL :
+ PQgetvalue(res, i, 7);
+
+ if (first_item)
+ first_item = false;
+ else
+ appendStringInfoString(&buf, ",\n");
- /* Scan all rows for this table */
- do
- {
- char *attname;
- char *typename;
- char *attnotnull;
- char *attgenerated;
- char *attdefault;
- char *collname;
- char *collnamespace;
-
- /* If table has no columns, we'll see nulls here */
- if (PQgetisnull(res, i, 1))
- continue;
+ /* Print column name and type */
+ appendStringInfo(&buf, " %s %s",
+ quote_identifier(attname),
+ typename);
- attname = PQgetvalue(res, i, 1);
- typename = PQgetvalue(res, i, 2);
- attnotnull = PQgetvalue(res, i, 3);
- attdefault = PQgetisnull(res, i, 4) ? NULL :
- PQgetvalue(res, i, 4);
- attgenerated = PQgetisnull(res, i, 5) ? NULL :
- PQgetvalue(res, i, 5);
- collname = PQgetisnull(res, i, 6) ? NULL :
- PQgetvalue(res, i, 6);
- collnamespace = PQgetisnull(res, i, 7) ? NULL :
- PQgetvalue(res, i, 7);
-
- if (first_item)
- first_item = false;
- else
- appendStringInfoString(&buf, ",\n");
+ /*
+ * Add column_name option so that renaming the foreign table's
+ * column doesn't break the association to the underlying column.
+ */
+ appendStringInfoString(&buf, " OPTIONS (column_name ");
+ deparseStringLiteral(&buf, attname);
+ appendStringInfoChar(&buf, ')');
- /* Print column name and type */
- appendStringInfo(&buf, " %s %s",
- quote_identifier(attname),
- typename);
+ /* Add COLLATE if needed */
+ if (import_collate && collname != NULL && collnamespace != NULL)
+ appendStringInfo(&buf, " COLLATE %s.%s",
+ quote_identifier(collnamespace),
+ quote_identifier(collname));
- /*
- * Add column_name option so that renaming the foreign table's
- * column doesn't break the association to the underlying
- * column.
- */
- appendStringInfoString(&buf, " OPTIONS (column_name ");
- deparseStringLiteral(&buf, attname);
- appendStringInfoChar(&buf, ')');
-
- /* Add COLLATE if needed */
- if (import_collate && collname != NULL && collnamespace != NULL)
- appendStringInfo(&buf, " COLLATE %s.%s",
- quote_identifier(collnamespace),
- quote_identifier(collname));
-
- /* Add DEFAULT if needed */
- if (import_default && attdefault != NULL &&
- (!attgenerated || !attgenerated[0]))
- appendStringInfo(&buf, " DEFAULT %s", attdefault);
-
- /* Add GENERATED if needed */
- if (import_generated && attgenerated != NULL &&
- attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
- {
- Assert(attdefault != NULL);
- appendStringInfo(&buf,
- " GENERATED ALWAYS AS (%s) STORED",
- attdefault);
- }
+ /* Add DEFAULT if needed */
+ if (import_default && attdefault != NULL &&
+ (!attgenerated || !attgenerated[0]))
+ appendStringInfo(&buf, " DEFAULT %s", attdefault);
- /* Add NOT NULL if needed */
- if (import_not_null && attnotnull[0] == 't')
- appendStringInfoString(&buf, " NOT NULL");
+ /* Add GENERATED if needed */
+ if (import_generated && attgenerated != NULL &&
+ attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
+ {
+ Assert(attdefault != NULL);
+ appendStringInfo(&buf,
+ " GENERATED ALWAYS AS (%s) STORED",
+ attdefault);
}
- while (++i < numrows &&
- strcmp(PQgetvalue(res, i, 0), tablename) == 0);
- /*
- * Add server name and table-level options. We specify remote
- * schema and table name as options (the latter to ensure that
- * renaming the foreign table doesn't break the association).
- */
- appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
- quote_identifier(server->servername));
+ /* Add NOT NULL if needed */
+ if (import_not_null && attnotnull[0] == 't')
+ appendStringInfoString(&buf, " NOT NULL");
+ }
+ while (++i < numrows &&
+ strcmp(PQgetvalue(res, i, 0), tablename) == 0);
- appendStringInfoString(&buf, "schema_name ");
- deparseStringLiteral(&buf, stmt->remote_schema);
- appendStringInfoString(&buf, ", table_name ");
- deparseStringLiteral(&buf, tablename);
+ /*
+ * Add server name and table-level options. We specify remote schema
+ * and table name as options (the latter to ensure that renaming the
+ * foreign table doesn't break the association).
+ */
+ appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
+ quote_identifier(server->servername));
- appendStringInfoString(&buf, ");");
+ appendStringInfoString(&buf, "schema_name ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
+ appendStringInfoString(&buf, ", table_name ");
+ deparseStringLiteral(&buf, tablename);
- commands = lappend(commands, pstrdup(buf.data));
- }
- }
- PG_FINALLY();
- {
- PQclear(res);
+ appendStringInfoString(&buf, ");");
+
+ commands = lappend(commands, pstrdup(buf.data));
}
- PG_END_TRY();
+ PQclear(res);
ReleaseConnection(conn);
@@ -7424,7 +7312,7 @@ postgresForeignAsyncNotify(AsyncRequest *areq)
/* On error, report the original query, not the FETCH. */
if (!PQconsumeInput(fsstate->conn))
- pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query);
fetch_more_data(node);
@@ -7523,7 +7411,7 @@ fetch_more_data_begin(AsyncRequest *areq)
fsstate->fetch_size, fsstate->cursor_number);
if (!PQsendQuery(fsstate->conn, sql))
- pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query);
/* Remember that the request is in process */
fsstate->conn_state->pendingAreq = areq;
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 81358f3bde7..38e1a885941 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -15,7 +15,7 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
-#include "libpq-fe.h"
+#include "libpq/libpq-be-fe.h"
#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
@@ -167,7 +167,7 @@ extern PGresult *pgfdw_get_result(PGconn *conn);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
- bool clear, const char *sql);
+ const char *sql);
/* in option.c */
extern int ExtractConnectionOptions(List *defelems,
diff --git a/doc/src/sgml/pgbuffercache.sgml b/doc/src/sgml/pgbuffercache.sgml
index 546ace8369e..eeb85a0e049 100644
--- a/doc/src/sgml/pgbuffercache.sgml
+++ b/doc/src/sgml/pgbuffercache.sgml
@@ -20,10 +20,18 @@
</indexterm>
<indexterm>
+ <primary>pg_buffercache_numa</primary>
+ </indexterm>
+
+ <indexterm>
<primary>pg_buffercache_summary</primary>
</indexterm>
<indexterm>
+ <primary>pg_buffercache_usage_counts</primary>
+ </indexterm>
+
+ <indexterm>
<primary>pg_buffercache_evict</primary>
</indexterm>
@@ -489,7 +497,7 @@
</sect2>
<sect2 id="pgbuffercache-pg-buffercache-evict-relation">
- <title>The <structname>pg_buffercache_evict_relation</structname> Function</title>
+ <title>The <structname>pg_buffercache_evict_relation()</structname> Function</title>
<para>
The <function>pg_buffercache_evict_relation()</function> function is very
similar to the <function>pg_buffercache_evict()</function> function. The
@@ -507,7 +515,7 @@
</sect2>
<sect2 id="pgbuffercache-pg-buffercache-evict-all">
- <title>The <structname>pg_buffercache_evict_all</structname> Function</title>
+ <title>The <structname>pg_buffercache_evict_all()</structname> Function</title>
<para>
The <function>pg_buffercache_evict_all()</function> function is very
similar to the <function>pg_buffercache_evict()</function> function. The
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 745a04ef26e..8f918e00af7 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -364,7 +364,7 @@ visibilitymap_get_status(Relation rel, BlockNumber heapBlk, Buffer *vmbuf)
{
*vmbuf = vm_readbuf(rel, mapBlock, false);
if (!BufferIsValid(*vmbuf))
- return false;
+ return (uint8) 0;
}
map = PageGetContents(BufferGetPage(*vmbuf));
diff --git a/src/backend/access/index/amapi.c b/src/backend/access/index/amapi.c
index f0f4f974bce..60684c53422 100644
--- a/src/backend/access/index/amapi.c
+++ b/src/backend/access/index/amapi.c
@@ -42,6 +42,19 @@ GetIndexAmRoutine(Oid amhandler)
elog(ERROR, "index access method handler function %u did not return an IndexAmRoutine struct",
amhandler);
+ /* Assert that all required callbacks are present. */
+ Assert(routine->ambuild != NULL);
+ Assert(routine->ambuildempty != NULL);
+ Assert(routine->aminsert != NULL);
+ Assert(routine->ambulkdelete != NULL);
+ Assert(routine->amvacuumcleanup != NULL);
+ Assert(routine->amcostestimate != NULL);
+ Assert(routine->amoptions != NULL);
+ Assert(routine->amvalidate != NULL);
+ Assert(routine->ambeginscan != NULL);
+ Assert(routine->amrescan != NULL);
+ Assert(routine->amendscan != NULL);
+
return routine;
}
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 8d2d7431544..fcd5fcd8915 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -1588,6 +1588,7 @@ ImportForeignSchema(ImportForeignSchemaStmt *stmt)
pstmt->utilityStmt = (Node *) cstmt;
pstmt->stmt_location = rs->stmt_location;
pstmt->stmt_len = rs->stmt_len;
+ pstmt->cached_plan_type = PLAN_CACHE_NONE;
/* Execute statement */
ProcessUtility(pstmt, cmd, false,
diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c
index 546160f0941..c00f1a11384 100644
--- a/src/backend/commands/schemacmds.c
+++ b/src/backend/commands/schemacmds.c
@@ -215,6 +215,7 @@ CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString,
wrapper->utilityStmt = stmt;
wrapper->stmt_location = stmt_location;
wrapper->stmt_len = stmt_len;
+ wrapper->cached_plan_type = PLAN_CACHE_NONE;
/* do this step */
ProcessUtility(wrapper,
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f3e77bda279..fc76f22fb82 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -189,6 +189,7 @@ ExecSerializePlan(Plan *plan, EState *estate)
pstmt->permInfos = estate->es_rteperminfos;
pstmt->resultRelations = NIL;
pstmt->appendRelations = NIL;
+ pstmt->cached_plan_type = PLAN_CACHE_NONE;
/*
* Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c
index 9f4d05ffbd4..4da46666439 100644
--- a/src/backend/libpq/auth.c
+++ b/src/backend/libpq/auth.c
@@ -94,8 +94,16 @@ static int auth_peer(hbaPort *port);
#define PGSQL_PAM_SERVICE "postgresql" /* Service name passed to PAM */
+/* Work around original Solaris' lack of "const" in the conv_proc signature */
+#ifdef _PAM_LEGACY_NONCONST
+#define PG_PAM_CONST
+#else
+#define PG_PAM_CONST const
+#endif
+
static int CheckPAMAuth(Port *port, const char *user, const char *password);
-static int pam_passwd_conv_proc(int num_msg, const struct pam_message **msg,
+static int pam_passwd_conv_proc(int num_msg,
+ PG_PAM_CONST struct pam_message **msg,
struct pam_response **resp, void *appdata_ptr);
static struct pam_conv pam_passw_conv = {
@@ -1917,7 +1925,7 @@ auth_peer(hbaPort *port)
*/
static int
-pam_passwd_conv_proc(int num_msg, const struct pam_message **msg,
+pam_passwd_conv_proc(int num_msg, PG_PAM_CONST struct pam_message **msg,
struct pam_response **resp, void *appdata_ptr)
{
const char *passwd;
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index c989e72cac5..a77b2147e95 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -582,6 +582,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
result->utilityStmt = parse->utilityStmt;
result->stmt_location = parse->stmt_location;
result->stmt_len = parse->stmt_len;
+ result->cached_plan_type = PLAN_CACHE_NONE;
result->jitFlags = PGJIT_NONE;
if (jit_enabled && jit_above_cost >= 0 &&
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 73345bb3c70..db43034b9db 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -318,6 +318,11 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> opt_qualified_name
%type <boolean> opt_concurrently
%type <dbehavior> opt_drop_behavior
+%type <list> opt_utility_option_list
+%type <list> utility_option_list
+%type <defelt> utility_option_elem
+%type <str> utility_option_name
+%type <node> utility_option_arg
%type <node> alter_column_default opclass_item opclass_drop alter_using
%type <ival> add_drop opt_asc_desc opt_nulls_order
@@ -338,10 +343,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
create_extension_opt_item alter_extension_opt_item
%type <ival> opt_lock lock_type cast_context
-%type <str> utility_option_name
-%type <defelt> utility_option_elem
-%type <list> utility_option_list
-%type <node> utility_option_arg
%type <defelt> drop_option
%type <boolean> opt_or_replace opt_no
opt_grant_grant_option
@@ -556,7 +557,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> generic_option_list alter_generic_option_list
%type <ival> reindex_target_relation reindex_target_all
-%type <list> opt_reindex_option_list
%type <node> copy_generic_opt_arg copy_generic_opt_arg_list_item
%type <defelt> copy_generic_opt_elem
@@ -1141,6 +1141,41 @@ opt_drop_behavior:
| /* EMPTY */ { $$ = DROP_RESTRICT; /* default */ }
;
+opt_utility_option_list:
+ '(' utility_option_list ')' { $$ = $2; }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
+utility_option_list:
+ utility_option_elem
+ {
+ $$ = list_make1($1);
+ }
+ | utility_option_list ',' utility_option_elem
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
+
+utility_option_elem:
+ utility_option_name utility_option_arg
+ {
+ $$ = makeDefElem($1, $2, @1);
+ }
+ ;
+
+utility_option_name:
+ NonReservedWord { $$ = $1; }
+ | analyze_keyword { $$ = "analyze"; }
+ | FORMAT_LA { $$ = "format"; }
+ ;
+
+utility_option_arg:
+ opt_boolean_or_string { $$ = (Node *) makeString($1); }
+ | NumericOnly { $$ = (Node *) $1; }
+ | /* EMPTY */ { $$ = NULL; }
+ ;
+
/*****************************************************************************
*
* CALL statement
@@ -2028,18 +2063,12 @@ constraints_set_mode:
* Checkpoint statement
*/
CheckPointStmt:
- CHECKPOINT
+ CHECKPOINT opt_utility_option_list
{
CheckPointStmt *n = makeNode(CheckPointStmt);
$$ = (Node *) n;
- }
- | CHECKPOINT '(' utility_option_list ')'
- {
- CheckPointStmt *n = makeNode(CheckPointStmt);
-
- $$ = (Node *) n;
- n->options = $3;
+ n->options = $2;
}
;
@@ -9354,7 +9383,7 @@ DropTransformStmt: DROP TRANSFORM opt_if_exists FOR Typename LANGUAGE name opt_d
*****************************************************************************/
ReindexStmt:
- REINDEX opt_reindex_option_list reindex_target_relation opt_concurrently qualified_name
+ REINDEX opt_utility_option_list reindex_target_relation opt_concurrently qualified_name
{
ReindexStmt *n = makeNode(ReindexStmt);
@@ -9367,7 +9396,7 @@ ReindexStmt:
makeDefElem("concurrently", NULL, @4));
$$ = (Node *) n;
}
- | REINDEX opt_reindex_option_list SCHEMA opt_concurrently name
+ | REINDEX opt_utility_option_list SCHEMA opt_concurrently name
{
ReindexStmt *n = makeNode(ReindexStmt);
@@ -9380,7 +9409,7 @@ ReindexStmt:
makeDefElem("concurrently", NULL, @4));
$$ = (Node *) n;
}
- | REINDEX opt_reindex_option_list reindex_target_all opt_concurrently opt_single_name
+ | REINDEX opt_utility_option_list reindex_target_all opt_concurrently opt_single_name
{
ReindexStmt *n = makeNode(ReindexStmt);
@@ -9402,10 +9431,6 @@ reindex_target_all:
SYSTEM_P { $$ = REINDEX_OBJECT_SYSTEM; }
| DATABASE { $$ = REINDEX_OBJECT_DATABASE; }
;
-opt_reindex_option_list:
- '(' utility_option_list ')' { $$ = $2; }
- | /* EMPTY */ { $$ = NULL; }
- ;
/*****************************************************************************
*
@@ -11903,13 +11928,13 @@ ClusterStmt:
n->params = $3;
$$ = (Node *) n;
}
- | CLUSTER '(' utility_option_list ')'
+ | CLUSTER opt_utility_option_list
{
ClusterStmt *n = makeNode(ClusterStmt);
n->relation = NULL;
n->indexname = NULL;
- n->params = $3;
+ n->params = $2;
$$ = (Node *) n;
}
/* unparenthesized VERBOSE kept for pre-14 compatibility */
@@ -11919,21 +11944,18 @@ ClusterStmt:
n->relation = $3;
n->indexname = $4;
- n->params = NIL;
if ($2)
- n->params = lappend(n->params, makeDefElem("verbose", NULL, @2));
+ n->params = list_make1(makeDefElem("verbose", NULL, @2));
$$ = (Node *) n;
}
/* unparenthesized VERBOSE kept for pre-17 compatibility */
- | CLUSTER opt_verbose
+ | CLUSTER VERBOSE
{
ClusterStmt *n = makeNode(ClusterStmt);
n->relation = NULL;
n->indexname = NULL;
- n->params = NIL;
- if ($2)
- n->params = lappend(n->params, makeDefElem("verbose", NULL, @2));
+ n->params = list_make1(makeDefElem("verbose", NULL, @2));
$$ = (Node *) n;
}
/* kept for pre-8.3 compatibility */
@@ -11943,9 +11965,8 @@ ClusterStmt:
n->relation = $5;
n->indexname = $3;
- n->params = NIL;
if ($2)
- n->params = lappend(n->params, makeDefElem("verbose", NULL, @2));
+ n->params = list_make1(makeDefElem("verbose", NULL, @2));
$$ = (Node *) n;
}
;
@@ -11996,64 +12017,31 @@ VacuumStmt: VACUUM opt_full opt_freeze opt_verbose opt_analyze opt_vacuum_relati
}
;
-AnalyzeStmt: analyze_keyword opt_verbose opt_vacuum_relation_list
+AnalyzeStmt: analyze_keyword opt_utility_option_list opt_vacuum_relation_list
{
VacuumStmt *n = makeNode(VacuumStmt);
- n->options = NIL;
- if ($2)
- n->options = lappend(n->options,
- makeDefElem("verbose", NULL, @2));
+ n->options = $2;
n->rels = $3;
n->is_vacuumcmd = false;
$$ = (Node *) n;
}
- | analyze_keyword '(' utility_option_list ')' opt_vacuum_relation_list
+ | analyze_keyword VERBOSE opt_vacuum_relation_list
{
VacuumStmt *n = makeNode(VacuumStmt);
- n->options = $3;
- n->rels = $5;
+ n->options = list_make1(makeDefElem("verbose", NULL, @2));
+ n->rels = $3;
n->is_vacuumcmd = false;
$$ = (Node *) n;
}
;
-utility_option_list:
- utility_option_elem
- {
- $$ = list_make1($1);
- }
- | utility_option_list ',' utility_option_elem
- {
- $$ = lappend($1, $3);
- }
- ;
-
analyze_keyword:
ANALYZE
| ANALYSE /* British */
;
-utility_option_elem:
- utility_option_name utility_option_arg
- {
- $$ = makeDefElem($1, $2, @1);
- }
- ;
-
-utility_option_name:
- NonReservedWord { $$ = $1; }
- | analyze_keyword { $$ = "analyze"; }
- | FORMAT_LA { $$ = "format"; }
- ;
-
-utility_option_arg:
- opt_boolean_or_string { $$ = (Node *) makeString($1); }
- | NumericOnly { $$ = (Node *) $1; }
- | /* EMPTY */ { $$ = NULL; }
- ;
-
opt_analyze:
analyze_keyword { $$ = true; }
| /*EMPTY*/ { $$ = false; }
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 116ddf7b835..1ad65c237c3 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -613,6 +613,7 @@ ResetBackgroundWorkerCrashTimes(void)
* resetting.
*/
rw->rw_crashed_at = 0;
+ rw->rw_pid = 0;
/*
* If there was anyone waiting for it, they're history.
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 2809e298a44..8490148a47d 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -130,6 +130,13 @@ typedef struct
int num_requests; /* current # of requests */
int max_requests; /* allocated array size */
+
+ int head; /* Index of the first request in the ring
+ * buffer */
+ int tail; /* Index of the last request in the ring
+ * buffer */
+
+ /* The ring buffer of pending checkpointer requests */
CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
} CheckpointerShmemStruct;
@@ -138,6 +145,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
#define WRITES_PER_ABSORB 1000
+/* Maximum number of checkpointer requests to process in one batch */
+#define CKPT_REQ_BATCH_SIZE 10000
+
+/* Max number of requests the checkpointer request queue can hold */
+#define MAX_CHECKPOINT_REQUESTS 10000000
+
/*
* GUC parameters
*/
@@ -973,7 +986,8 @@ CheckpointerShmemInit(void)
*/
MemSet(CheckpointerShmem, 0, size);
SpinLockInit(&CheckpointerShmem->ckpt_lck);
- CheckpointerShmem->max_requests = NBuffers;
+ CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS);
+ CheckpointerShmem->head = CheckpointerShmem->tail = 0;
ConditionVariableInit(&CheckpointerShmem->start_cv);
ConditionVariableInit(&CheckpointerShmem->done_cv);
}
@@ -1201,6 +1215,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
{
CheckpointerRequest *request;
bool too_full;
+ int insert_pos;
if (!IsUnderPostmaster)
return false; /* probably shouldn't even get here */
@@ -1224,10 +1239,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
}
/* OK, insert request */
- request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
+ insert_pos = CheckpointerShmem->tail;
+ request = &CheckpointerShmem->requests[insert_pos];
request->ftag = *ftag;
request->type = type;
+ CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests;
+ CheckpointerShmem->num_requests++;
+
/* If queue is more than half full, nudge the checkpointer to empty it */
too_full = (CheckpointerShmem->num_requests >=
CheckpointerShmem->max_requests / 2);
@@ -1269,12 +1288,16 @@ CompactCheckpointerRequestQueue(void)
struct CheckpointerSlotMapping
{
CheckpointerRequest request;
- int slot;
+ int ring_idx;
};
- int n,
- preserve_count;
+ int n;
int num_skipped = 0;
+ int head;
+ int max_requests;
+ int num_requests;
+ int read_idx,
+ write_idx;
HASHCTL ctl;
HTAB *htab;
bool *skip_slot;
@@ -1286,8 +1309,13 @@ CompactCheckpointerRequestQueue(void)
if (CritSectionCount > 0)
return false;
+ max_requests = CheckpointerShmem->max_requests;
+ num_requests = CheckpointerShmem->num_requests;
+
/* Initialize skip_slot array */
- skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
+ skip_slot = palloc0(sizeof(bool) * max_requests);
+
+ head = CheckpointerShmem->head;
/* Initialize temporary hash table */
ctl.keysize = sizeof(CheckpointerRequest);
@@ -1311,7 +1339,8 @@ CompactCheckpointerRequestQueue(void)
* away preceding entries that would end up being canceled anyhow), but
* it's not clear that the extra complexity would buy us anything.
*/
- for (n = 0; n < CheckpointerShmem->num_requests; n++)
+ read_idx = head;
+ for (n = 0; n < num_requests; n++)
{
CheckpointerRequest *request;
struct CheckpointerSlotMapping *slotmap;
@@ -1324,16 +1353,19 @@ CompactCheckpointerRequestQueue(void)
* CheckpointerShmemInit. Note also that RelFileLocator had better
* contain no pad bytes.
*/
- request = &CheckpointerShmem->requests[n];
+ request = &CheckpointerShmem->requests[read_idx];
slotmap = hash_search(htab, request, HASH_ENTER, &found);
if (found)
{
/* Duplicate, so mark the previous occurrence as skippable */
- skip_slot[slotmap->slot] = true;
+ skip_slot[slotmap->ring_idx] = true;
num_skipped++;
}
/* Remember slot containing latest occurrence of this request value */
- slotmap->slot = n;
+ slotmap->ring_idx = read_idx;
+
+ /* Move to the next request in the ring buffer */
+ read_idx = (read_idx + 1) % max_requests;
}
/* Done with the hash table. */
@@ -1347,17 +1379,34 @@ CompactCheckpointerRequestQueue(void)
}
/* We found some duplicates; remove them. */
- preserve_count = 0;
- for (n = 0; n < CheckpointerShmem->num_requests; n++)
+ read_idx = write_idx = head;
+ for (n = 0; n < num_requests; n++)
{
- if (skip_slot[n])
- continue;
- CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+ /* If this slot is NOT skipped, keep it */
+ if (!skip_slot[read_idx])
+ {
+ /* If the read and write positions are different, copy the request */
+ if (write_idx != read_idx)
+ CheckpointerShmem->requests[write_idx] =
+ CheckpointerShmem->requests[read_idx];
+
+ /* Advance the write position */
+ write_idx = (write_idx + 1) % max_requests;
+ }
+
+ read_idx = (read_idx + 1) % max_requests;
}
+
+ /*
+ * Update ring buffer state: head remains the same, tail moves, count
+ * decreases
+ */
+ CheckpointerShmem->tail = write_idx;
+ CheckpointerShmem->num_requests -= num_skipped;
+
ereport(DEBUG1,
(errmsg_internal("compacted fsync request queue from %d entries to %d entries",
- CheckpointerShmem->num_requests, preserve_count)));
- CheckpointerShmem->num_requests = preserve_count;
+ num_requests, CheckpointerShmem->num_requests)));
/* Cleanup. */
pfree(skip_slot);
@@ -1378,40 +1427,64 @@ AbsorbSyncRequests(void)
{
CheckpointerRequest *requests = NULL;
CheckpointerRequest *request;
- int n;
+ int n,
+ i;
+ bool loop;
if (!AmCheckpointerProcess())
return;
- LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
- /*
- * We try to avoid holding the lock for a long time by copying the request
- * array, and processing the requests after releasing the lock.
- *
- * Once we have cleared the requests from shared memory, we have to PANIC
- * if we then fail to absorb them (eg, because our hashtable runs out of
- * memory). This is because the system cannot run safely if we are unable
- * to fsync what we have been told to fsync. Fortunately, the hashtable
- * is so small that the problem is quite unlikely to arise in practice.
- */
- n = CheckpointerShmem->num_requests;
- if (n > 0)
+ do
{
- requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
- memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
- }
+ LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+
+ /*---
+ * We try to avoid holding the lock for a long time by:
+ * 1. Copying the request array and processing the requests after
+ * releasing the lock;
+ * 2. Processing not the whole queue, but only batches of
+ * CKPT_REQ_BATCH_SIZE at once.
+ *
+ * Once we have cleared the requests from shared memory, we must
+ * PANIC if we then fail to absorb them (e.g., because our hashtable
+ * runs out of memory). This is because the system cannot run safely
+ * if we are unable to fsync what we have been told to fsync.
+ * Fortunately, the hashtable is so small that the problem is quite
+ * unlikely to arise in practice.
+ *
+ * Note: The maximum possible size of a ring buffer is
+ * MAX_CHECKPOINT_REQUESTS entries, which fit into a maximum palloc
+ * allocation size of 1Gb. Our maximum batch size,
+ * CKPT_REQ_BATCH_SIZE, is even smaller.
+ */
+ n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE);
+ if (n > 0)
+ {
+ if (!requests)
+ requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
- START_CRIT_SECTION();
+ for (i = 0; i < n; i++)
+ {
+ requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head];
+ CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests;
+ }
- CheckpointerShmem->num_requests = 0;
+ CheckpointerShmem->num_requests -= n;
- LWLockRelease(CheckpointerCommLock);
+ }
+
+ START_CRIT_SECTION();
+
+ /* Are there any requests in the queue? If so, keep going. */
+ loop = CheckpointerShmem->num_requests != 0;
+
+ LWLockRelease(CheckpointerCommLock);
- for (request = requests; n > 0; request++, n--)
- RememberSyncRequest(&request->ftag, request->type);
+ for (request = requests; n > 0; request++, n--)
+ RememberSyncRequest(&request->ftag, request->type);
- END_CRIT_SECTION();
+ END_CRIT_SECTION();
+ } while (loop);
if (requests)
pfree(requests);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index cca9b946e53..e01d9f0cfe8 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2630,6 +2630,13 @@ CleanupBackend(PMChild *bp,
}
bp = NULL;
+ /*
+ * In a crash case, exit immediately without resetting background worker
+ * state. However, if restart_after_crash is enabled, the background
+ * worker state (e.g., rw_pid) still needs be reset so the worker can
+ * restart after crash recovery. This reset is handled in
+ * ResetBackgroundWorkerCrashTimes(), not here.
+ */
if (crashed)
{
HandleChildCrash(bp_pid, exitstatus, procname);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 886d99951dd..239641bfbb6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -421,31 +421,22 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
"IDENTIFY_SYSTEM",
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
/*
* IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
* 9.4 and onwards.
*/
if (PQnfields(res) < 3 || PQntuples(res) != 1)
- {
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
-
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
- ntuples, nfields, 1, 3)));
- }
+ PQntuples(res), PQnfields(res), 1, 3)));
primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
PQclear(res);
@@ -607,13 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
return false;
}
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
PQclear(res);
return true;
}
@@ -721,26 +709,17 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
cmd,
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive timeline history file from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
if (PQnfields(res) != 2 || PQntuples(res) != 1)
- {
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
-
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
- ntuples, nfields)));
- }
+ PQntuples(res), PQnfields(res))));
*filename = pstrdup(PQgetvalue(res, 0, 0));
*len = PQgetlength(res, 0, 1);
@@ -844,13 +823,10 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
return -1;
}
else
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
- }
}
if (rawlen < -1)
ereport(ERROR,
@@ -974,13 +950,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
- }
if (lsn)
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index c6aefd2f688..beadeb5e46a 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -187,9 +187,11 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
- (wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+
+ if (IsUnderPostmaster)
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
+ (wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
+ NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 2f8c3d5f918..a297606cdd7 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -988,6 +988,7 @@ pg_plan_queries(List *querytrees, const char *query_string, int cursorOptions,
stmt->stmt_location = query->stmt_location;
stmt->stmt_len = query->stmt_len;
stmt->queryId = query->queryId;
+ stmt->cached_plan_type = PLAN_CACHE_NONE;
}
else
{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 4c1faf5575c..babc34d0cbe 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1234,6 +1234,7 @@ ProcessUtilitySlow(ParseState *pstate,
wrapper->utilityStmt = stmt;
wrapper->stmt_location = pstmt->stmt_location;
wrapper->stmt_len = pstmt->stmt_len;
+ wrapper->cached_plan_type = PLAN_CACHE_NONE;
ProcessUtility(wrapper,
queryString,
@@ -1964,6 +1965,7 @@ ProcessUtilityForAlterTable(Node *stmt, AlterTableUtilityContext *context)
wrapper->utilityStmt = stmt;
wrapper->stmt_location = context->pstmt->stmt_location;
wrapper->stmt_len = context->pstmt->stmt_len;
+ wrapper->cached_plan_type = PLAN_CACHE_NONE;
ProcessUtility(wrapper,
context->queryString,
diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c
index 89a1c79e984..f4d2b9458a5 100644
--- a/src/backend/utils/cache/plancache.c
+++ b/src/backend/utils/cache/plancache.c
@@ -1283,6 +1283,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
CachedPlan *plan = NULL;
List *qlist;
bool customplan;
+ ListCell *lc;
/* Assert caller is doing things in a sane order */
Assert(plansource->magic == CACHEDPLANSOURCE_MAGIC);
@@ -1385,6 +1386,13 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
plan->is_saved = true;
}
+ foreach(lc, plan->stmt_list)
+ {
+ PlannedStmt *pstmt = (PlannedStmt *) lfirst(lc);
+
+ pstmt->cached_plan_type = customplan ? PLAN_CACHE_CUSTOM : PLAN_CACHE_GENERIC;
+ }
+
return plan;
}
diff --git a/src/backend/utils/hash/dynahash.c b/src/backend/utils/hash/dynahash.c
index 1ad155d446e..42e9be274fc 100644
--- a/src/backend/utils/hash/dynahash.c
+++ b/src/backend/utils/hash/dynahash.c
@@ -195,6 +195,7 @@ struct HASHHDR
long ssize; /* segment size --- must be power of 2 */
int sshift; /* segment shift = log2(ssize) */
int nelem_alloc; /* number of entries to allocate at once */
+ bool isfixed; /* if true, don't enlarge */
#ifdef HASH_STATISTICS
@@ -227,7 +228,6 @@ struct HTAB
MemoryContext hcxt; /* memory context if default allocator used */
char *tabname; /* table name (for error messages) */
bool isshared; /* true if table is in shared memory */
- bool isfixed; /* if true, don't enlarge */
/* freezing a shared table isn't allowed, so we can keep state here */
bool frozen; /* true = no more inserts allowed */
@@ -618,8 +618,10 @@ hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
}
}
+ /* Set isfixed if requested, but not till after we build initial entries */
if (flags & HASH_FIXED_SIZE)
- hashp->isfixed = true;
+ hctl->isfixed = true;
+
return hashp;
}
@@ -644,6 +646,8 @@ hdefault(HTAB *hashp)
hctl->ssize = DEF_SEGSIZE;
hctl->sshift = DEF_SEGSIZE_SHIFT;
+ hctl->isfixed = false; /* can be enlarged */
+
#ifdef HASH_STATISTICS
hctl->accesses = hctl->collisions = 0;
#endif
@@ -1713,7 +1717,7 @@ element_alloc(HTAB *hashp, int nelem, int freelist_idx)
HASHELEMENT *prevElement;
int i;
- if (hashp->isfixed)
+ if (hctl->isfixed)
return false;
/* Each element has a HASHELEMENT header plus user data. */
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c
index 15fa4d0a55e..ce01dce9861 100644
--- a/src/backend/utils/mmgr/mcxt.c
+++ b/src/backend/utils/mmgr/mcxt.c
@@ -560,9 +560,7 @@ MemoryContextDeleteChildren(MemoryContext context)
* the specified context, since that means it will automatically be freed
* when no longer needed.
*
- * There is no API for deregistering a callback once registered. If you
- * want it to not do anything anymore, adjust the state pointed to by its
- * "arg" to indicate that.
+ * Note that callers can assume this cannot fail.
*/
void
MemoryContextRegisterResetCallback(MemoryContext context,
@@ -578,6 +576,41 @@ MemoryContextRegisterResetCallback(MemoryContext context,
}
/*
+ * MemoryContextUnregisterResetCallback
+ * Undo the effects of MemoryContextRegisterResetCallback.
+ *
+ * This can be used if a callback's effects are no longer required
+ * at some point before the context has been reset/deleted. It is the
+ * caller's responsibility to pfree the callback struct (if needed).
+ *
+ * An assertion failure occurs if the callback was not registered.
+ * We could alternatively define that case as a no-op, but that seems too
+ * likely to mask programming errors such as passing the wrong context.
+ */
+void
+MemoryContextUnregisterResetCallback(MemoryContext context,
+ MemoryContextCallback *cb)
+{
+ MemoryContextCallback *prev,
+ *cur;
+
+ Assert(MemoryContextIsValid(context));
+
+ for (prev = NULL, cur = context->reset_cbs; cur != NULL;
+ prev = cur, cur = cur->next)
+ {
+ if (cur != cb)
+ continue;
+ if (prev)
+ prev->next = cur->next;
+ else
+ context->reset_cbs = cur->next;
+ return;
+ }
+ Assert(false);
+}
+
+/*
* MemoryContextCallResetCallbacks
* Internal function to call all registered callbacks for context.
*/
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 025b893a41e..3986882f042 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1250,8 +1250,17 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
appendPQExpBufferStr(recoveryconfcontents,
"recovery_target_timeline = 'latest'\n");
+
+ /*
+ * Set recovery_target_inclusive = false to avoid reapplying the
+ * transaction committed at 'lsn' after subscription is enabled. This is
+ * because the provided 'lsn' is also used as the replication start point
+ * for the subscription. So, the server can send the transaction committed
+ * at that 'lsn' after replication is started which can lead to applying
+ * the same transaction twice if we keep recovery_target_inclusive = true.
+ */
appendPQExpBufferStr(recoveryconfcontents,
- "recovery_target_inclusive = true\n");
+ "recovery_target_inclusive = false\n");
appendPQExpBufferStr(recoveryconfcontents,
"recovery_target_action = promote\n");
appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index af13bd6bf3d..1c4a342090c 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -30,17 +30,7 @@
#ifndef LIBPQ_BE_FE_HELPERS_H
#define LIBPQ_BE_FE_HELPERS_H
-/*
- * Despite the name, BUILDING_DLL is set only when building code directly part
- * of the backend. Which also is where libpq isn't allowed to be
- * used. Obviously this doesn't protect against libpq-fe.h getting included
- * otherwise, but perhaps still protects against a few mistakes...
- */
-#ifdef BUILDING_DLL
-#error "libpq may not be used code directly built into the backend"
-#endif
-
-#include "libpq-fe.h"
+#include "libpq/libpq-be-fe.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/latch.h"
@@ -289,41 +279,30 @@ libpqsrv_exec_params(PGconn *conn,
static inline PGresult *
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
{
- PGresult *volatile lastResult = NULL;
+ PGresult *lastResult = NULL;
- /* In what follows, do not leak any PGresults on an error. */
- PG_TRY();
+ for (;;)
{
- for (;;)
- {
- /* Wait for, and collect, the next PGresult. */
- PGresult *result;
+ /* Wait for, and collect, the next PGresult. */
+ PGresult *result;
- result = libpqsrv_get_result(conn, wait_event_info);
- if (result == NULL)
- break; /* query is complete, or failure */
+ result = libpqsrv_get_result(conn, wait_event_info);
+ if (result == NULL)
+ break; /* query is complete, or failure */
- /*
- * Emulate PQexec()'s behavior of returning the last result when
- * there are many.
- */
- PQclear(lastResult);
- lastResult = result;
-
- if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
- PQresultStatus(lastResult) == PGRES_COPY_OUT ||
- PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
- PQstatus(conn) == CONNECTION_BAD)
- break;
- }
- }
- PG_CATCH();
- {
+ /*
+ * Emulate PQexec()'s behavior of returning the last result when there
+ * are many.
+ */
PQclear(lastResult);
- PG_RE_THROW();
- }
- PG_END_TRY();
+ lastResult = result;
+ if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
+ PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+ PQstatus(conn) == CONNECTION_BAD)
+ break;
+ }
return lastResult;
}
@@ -462,13 +441,21 @@ exit: ;
* This function is intended to be set via PQsetNoticeReceiver() so that
* NOTICE, WARNING, and similar messages from the connection are reported via
* ereport(), instead of being printed to stderr.
+ *
+ * Because this will be called from libpq with a "real" (not wrapped)
+ * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
+ * for PGresult and also PQresultErrorMessage, and put back the wrappers
+ * afterwards. That's not pretty, but there seems no better alternative.
*/
+#undef PGresult
+#undef PQresultErrorMessage
+
static inline void
libpqsrv_notice_receiver(void *arg, const PGresult *res)
{
- char *message;
+ const char *message;
int len;
- char *prefix = (char *) arg;
+ const char *prefix = (const char *) arg;
/*
* Trim the trailing newline from the message text returned from
@@ -484,4 +471,7 @@ libpqsrv_notice_receiver(void *arg, const PGresult *res)
errmsg_internal("%s: %.*s", prefix, len, message));
}
+#define PGresult libpqsrv_PGresult
+#define PQresultErrorMessage libpqsrv_PQresultErrorMessage
+
#endif /* LIBPQ_BE_FE_HELPERS_H */
diff --git a/src/include/libpq/libpq-be-fe.h b/src/include/libpq/libpq-be-fe.h
new file mode 100644
index 00000000000..e3f796b0230
--- /dev/null
+++ b/src/include/libpq/libpq-be-fe.h
@@ -0,0 +1,259 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq-be-fe.h
+ * Wrapper functions for using libpq in extensions
+ *
+ * Code built directly into the backend is not allowed to link to libpq
+ * directly. Extension code is allowed to use libpq however. One of the
+ * main risks in doing so is leaking the malloc-allocated structures
+ * returned by libpq, causing a process-lifespan memory leak.
+ *
+ * This file provides wrapper objects to help in building memory-safe code.
+ * A PGresult object wrapped this way acts much as if it were palloc'd:
+ * it will go away when the specified context is reset or deleted.
+ * We might later extend the concept to other objects such as PGconns.
+ *
+ * See also the libpq-be-fe-helpers.h file, which provides additional
+ * facilities built on top of this one.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/libpq-be-fe.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LIBPQ_BE_FE_H
+#define LIBPQ_BE_FE_H
+
+/*
+ * Despite the name, BUILDING_DLL is set only when building code directly part
+ * of the backend. Which also is where libpq isn't allowed to be
+ * used. Obviously this doesn't protect against libpq-fe.h getting included
+ * otherwise, but perhaps still protects against a few mistakes...
+ */
+#ifdef BUILDING_DLL
+#error "libpq may not be used in code directly built into the backend"
+#endif
+
+#include "libpq-fe.h"
+
+/*
+ * Memory-context-safe wrapper object for a PGresult.
+ */
+typedef struct libpqsrv_PGresult
+{
+ PGresult *res; /* the wrapped PGresult */
+ MemoryContext ctx; /* the MemoryContext it's attached to */
+ MemoryContextCallback cb; /* the callback that implements freeing */
+} libpqsrv_PGresult;
+
+
+/*
+ * Wrap the given PGresult in a libpqsrv_PGresult object, so that it will
+ * go away automatically if the current memory context is reset or deleted.
+ *
+ * To avoid potential memory leaks, backend code must always apply this
+ * immediately to the output of any PGresult-yielding libpq function.
+ */
+static inline libpqsrv_PGresult *
+libpqsrv_PQwrap(PGresult *res)
+{
+ libpqsrv_PGresult *bres;
+ MemoryContext ctx = CurrentMemoryContext;
+
+ /* We pass through a NULL result as-is, since there's nothing to free */
+ if (res == NULL)
+ return NULL;
+ /* Attempt to allocate the wrapper ... this had better not throw error */
+ bres = (libpqsrv_PGresult *)
+ MemoryContextAllocExtended(ctx,
+ sizeof(libpqsrv_PGresult),
+ MCXT_ALLOC_NO_OOM);
+ /* If we failed to allocate a wrapper, free the PGresult before failing */
+ if (bres == NULL)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ /* Okay, set up the wrapper */
+ bres->res = res;
+ bres->ctx = ctx;
+ bres->cb.func = (MemoryContextCallbackFunction) PQclear;
+ bres->cb.arg = res;
+ MemoryContextRegisterResetCallback(ctx, &bres->cb);
+ return bres;
+}
+
+/*
+ * Free a wrapped PGresult, after detaching it from the memory context.
+ * Like PQclear(), allow the argument to be NULL.
+ */
+static inline void
+libpqsrv_PQclear(libpqsrv_PGresult *bres)
+{
+ if (bres)
+ {
+ MemoryContextUnregisterResetCallback(bres->ctx, &bres->cb);
+ PQclear(bres->res);
+ pfree(bres);
+ }
+}
+
+/*
+ * Move a wrapped PGresult to have a different parent context.
+ */
+static inline libpqsrv_PGresult *
+libpqsrv_PGresultSetParent(libpqsrv_PGresult *bres, MemoryContext ctx)
+{
+ libpqsrv_PGresult *newres;
+
+ /* We pass through a NULL result as-is */
+ if (bres == NULL)
+ return NULL;
+ /* Make a new wrapper in the target context, raising error on OOM */
+ newres = (libpqsrv_PGresult *)
+ MemoryContextAlloc(ctx, sizeof(libpqsrv_PGresult));
+ /* Okay, set up the new wrapper */
+ newres->res = bres->res;
+ newres->ctx = ctx;
+ newres->cb.func = (MemoryContextCallbackFunction) PQclear;
+ newres->cb.arg = bres->res;
+ MemoryContextRegisterResetCallback(ctx, &newres->cb);
+ /* Disarm and delete the old wrapper */
+ MemoryContextUnregisterResetCallback(bres->ctx, &bres->cb);
+ pfree(bres);
+ return newres;
+}
+
+/*
+ * Convenience wrapper for PQgetResult.
+ *
+ * We could supply wrappers for other PGresult-returning functions too,
+ * but at present there's no need.
+ */
+static inline libpqsrv_PGresult *
+libpqsrv_PQgetResult(PGconn *conn)
+{
+ return libpqsrv_PQwrap(PQgetResult(conn));
+}
+
+/*
+ * Accessor functions for libpqsrv_PGresult. While it's not necessary to use
+ * these, they emulate the behavior of the underlying libpq functions when
+ * passed a NULL pointer. This is particularly important for PQresultStatus,
+ * which is often the first check on a result.
+ */
+
+static inline ExecStatusType
+libpqsrv_PQresultStatus(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return PGRES_FATAL_ERROR;
+ return PQresultStatus(res->res);
+}
+
+static inline const char *
+libpqsrv_PQresultErrorMessage(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return "";
+ return PQresultErrorMessage(res->res);
+}
+
+static inline char *
+libpqsrv_PQresultErrorField(const libpqsrv_PGresult *res, int fieldcode)
+{
+ if (!res)
+ return NULL;
+ return PQresultErrorField(res->res, fieldcode);
+}
+
+static inline char *
+libpqsrv_PQcmdStatus(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return NULL;
+ return PQcmdStatus(res->res);
+}
+
+static inline int
+libpqsrv_PQntuples(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return 0;
+ return PQntuples(res->res);
+}
+
+static inline int
+libpqsrv_PQnfields(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return 0;
+ return PQnfields(res->res);
+}
+
+static inline char *
+libpqsrv_PQgetvalue(const libpqsrv_PGresult *res, int tup_num, int field_num)
+{
+ if (!res)
+ return NULL;
+ return PQgetvalue(res->res, tup_num, field_num);
+}
+
+static inline int
+libpqsrv_PQgetlength(const libpqsrv_PGresult *res, int tup_num, int field_num)
+{
+ if (!res)
+ return 0;
+ return PQgetlength(res->res, tup_num, field_num);
+}
+
+static inline int
+libpqsrv_PQgetisnull(const libpqsrv_PGresult *res, int tup_num, int field_num)
+{
+ if (!res)
+ return 1; /* pretend it is null */
+ return PQgetisnull(res->res, tup_num, field_num);
+}
+
+static inline char *
+libpqsrv_PQfname(const libpqsrv_PGresult *res, int field_num)
+{
+ if (!res)
+ return NULL;
+ return PQfname(res->res, field_num);
+}
+
+static inline const char *
+libpqsrv_PQcmdTuples(const libpqsrv_PGresult *res)
+{
+ if (!res)
+ return "";
+ return PQcmdTuples(res->res);
+}
+
+/*
+ * Redefine these libpq entry point names concerned with PGresults so that
+ * they will operate on libpqsrv_PGresults instead. This avoids needing to
+ * convert a lot of pre-existing code, and reduces the notational differences
+ * between frontend and backend libpq-using code.
+ */
+#define PGresult libpqsrv_PGresult
+#define PQclear libpqsrv_PQclear
+#define PQgetResult libpqsrv_PQgetResult
+#define PQresultStatus libpqsrv_PQresultStatus
+#define PQresultErrorMessage libpqsrv_PQresultErrorMessage
+#define PQresultErrorField libpqsrv_PQresultErrorField
+#define PQcmdStatus libpqsrv_PQcmdStatus
+#define PQntuples libpqsrv_PQntuples
+#define PQnfields libpqsrv_PQnfields
+#define PQgetvalue libpqsrv_PQgetvalue
+#define PQgetlength libpqsrv_PQgetlength
+#define PQgetisnull libpqsrv_PQgetisnull
+#define PQfname libpqsrv_PQfname
+#define PQcmdTuples libpqsrv_PQcmdTuples
+
+#endif /* LIBPQ_BE_FE_H */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 4f59e30d62d..46e2e09ea35 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -29,6 +29,20 @@
*/
/* ----------------
+ * CachedPlanType
+ *
+ * CachedPlanType identifies whether a PlannedStmt is a cached plan, and if
+ * so, whether it is generic or custom.
+ * ----------------
+ */
+typedef enum CachedPlanType
+{
+ PLAN_CACHE_NONE = 0, /* Not a cached plan */
+ PLAN_CACHE_GENERIC, /* Generic cached plan */
+ PLAN_CACHE_CUSTOM, /* Custom cached plan */
+} CachedPlanType;
+
+/* ----------------
* PlannedStmt node
*
* The output of the planner is a Plan tree headed by a PlannedStmt node.
@@ -58,6 +72,9 @@ typedef struct PlannedStmt
/* plan identifier (can be set by plugins) */
int64 planId;
+ /* type of cached plan */
+ CachedPlanType cached_plan_type;
+
/* is it insert|update|delete|merge RETURNING? */
bool hasReturning;
diff --git a/src/include/port/solaris.h b/src/include/port/solaris.h
index e63a3bd824d..8ff40007c7f 100644
--- a/src/include/port/solaris.h
+++ b/src/include/port/solaris.h
@@ -24,3 +24,12 @@
#if defined(__i386__)
#include <sys/isa_defs.h>
#endif
+
+/*
+ * On original Solaris, PAM conversation procs lack a "const" in their
+ * declaration; but recent OpenIndiana versions put it there by default.
+ * The least messy way to deal with this is to define _PAM_LEGACY_NONCONST,
+ * which causes OpenIndiana to declare pam_conv per the Solaris tradition,
+ * and also use that symbol to control omitting the "const" in our own code.
+ */
+#define _PAM_LEGACY_NONCONST 1
diff --git a/src/include/utils/palloc.h b/src/include/utils/palloc.h
index e1b42267b22..039b9cba61a 100644
--- a/src/include/utils/palloc.h
+++ b/src/include/utils/palloc.h
@@ -133,6 +133,8 @@ MemoryContextSwitchTo(MemoryContext context)
/* Registration of memory context reset/delete callbacks */
extern void MemoryContextRegisterResetCallback(MemoryContext context,
MemoryContextCallback *cb);
+extern void MemoryContextUnregisterResetCallback(MemoryContext context,
+ MemoryContextCallback *cb);
/*
* These are like standard strdup() except the copied string is
diff --git a/src/include/utils/pgstat_kind.h b/src/include/utils/pgstat_kind.h
index f44169fd5a3..eb5f0b3ae6d 100644
--- a/src/include/utils/pgstat_kind.h
+++ b/src/include/utils/pgstat_kind.h
@@ -18,7 +18,7 @@
/* Range of IDs allowed, for built-in and custom kinds */
#define PGSTAT_KIND_MIN 1 /* Minimum ID allowed */
-#define PGSTAT_KIND_MAX 256 /* Maximum ID allowed */
+#define PGSTAT_KIND_MAX 32 /* Maximum ID allowed */
/* use 0 for INVALID, to catch zero-initialized data */
#define PGSTAT_KIND_INVALID 0
@@ -46,7 +46,7 @@
/* Custom stats kinds */
/* Range of IDs allowed for custom stats kinds */
-#define PGSTAT_KIND_CUSTOM_MIN 128
+#define PGSTAT_KIND_CUSTOM_MIN 24
#define PGSTAT_KIND_CUSTOM_MAX PGSTAT_KIND_MAX
#define PGSTAT_KIND_CUSTOM_SIZE (PGSTAT_KIND_CUSTOM_MAX - PGSTAT_KIND_CUSTOM_MIN + 1)
@@ -55,7 +55,7 @@
* development and have not reserved their own unique kind ID yet. See:
* https://wiki.postgresql.org/wiki/CustomCumulativeStats
*/
-#define PGSTAT_KIND_EXPERIMENTAL 128
+#define PGSTAT_KIND_EXPERIMENTAL 24
static inline bool
pgstat_is_kind_builtin(PgStat_Kind kind)
diff --git a/src/test/modules/injection_points/injection_stats.c b/src/test/modules/injection_points/injection_stats.c
index 14903c629e0..e3947b23ba5 100644
--- a/src/test/modules/injection_points/injection_stats.c
+++ b/src/test/modules/injection_points/injection_stats.c
@@ -59,7 +59,7 @@ static const PgStat_KindInfo injection_stats = {
/*
* Kind ID reserved for statistics of injection points.
*/
-#define PGSTAT_KIND_INJECTION 129
+#define PGSTAT_KIND_INJECTION 25
/* Track if stats are loaded */
static bool inj_stats_loaded = false;
diff --git a/src/test/modules/injection_points/injection_stats_fixed.c b/src/test/modules/injection_points/injection_stats_fixed.c
index 3d0c01bdd05..bc54c79d190 100644
--- a/src/test/modules/injection_points/injection_stats_fixed.c
+++ b/src/test/modules/injection_points/injection_stats_fixed.c
@@ -64,7 +64,7 @@ static const PgStat_KindInfo injection_stats_fixed = {
/*
* Kind ID reserved for statistics of injection points.
*/
-#define PGSTAT_KIND_INJECTION_FIXED 130
+#define PGSTAT_KIND_INJECTION_FIXED 26
/* Track if fixed-numbered stats are loaded */
static bool inj_fixed_loaded = false;
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 7458d7fba7e..976d53a870e 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -228,6 +228,11 @@ ok( $stderr =~
# Disable the subscription
$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;");
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
# Enable retain_dead_tuples for disabled subscription
($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
"ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);");
@@ -278,6 +283,11 @@ is($result, qq(1|1
# Disable the logical replication from node B to node A
$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a8656419cb6..3daba26b237 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -391,6 +391,7 @@ CachedFunctionHashEntry
CachedFunctionHashKey
CachedPlan
CachedPlanSource
+CachedPlanType
CallContext
CallStmt
CancelRequestPacket
@@ -3756,6 +3757,7 @@ leafSegmentInfo
leaf_item
libpq_gettext_func
libpq_source
+libpqsrv_PGresult
line_t
lineno_t
list_sort_comparator