diff options
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r-- | contrib/dblink/dblink.c | 431 |
1 files changed, 235 insertions, 196 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 9c8e308358..a6a3c09ff8 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -9,7 +9,7 @@ * Shridhar Daithankar <shridhar_daithankar@persistent.co.in> * * contrib/dblink/dblink.c - * Copyright (c) 2001-2016, PostgreSQL Global Development Group + * Copyright (c) 2001-2017, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -40,6 +40,7 @@ #include "access/reloptions.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_foreign_data_wrapper.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_type.h" #include "catalog/pg_user_mapping.h" @@ -58,8 +59,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/tqual.h" - -#include "dblink.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -112,7 +112,8 @@ static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclM static char *generate_relation_name(Relation rel); static void dblink_connstr_check(const char *connstr); static void dblink_security_check(PGconn *conn, remoteConn *rconn); -static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); +static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res, + const char *dblink_context_msg, bool fail); static char *get_connect_string(const char *servername); static char *escape_param_str(const char *from); static void validate_pkattnums(Relation rel, @@ -143,98 +144,108 @@ typedef struct remoteConnHashEnt /* initial number of connection hashes */ #define NUMCONN 16 -/* general utility */ -#define xpfree(var_) \ - do { \ - if (var_ != NULL) \ - { \ - pfree(var_); \ - var_ = NULL; \ - } \ - } while (0) - -#define xpstrdup(var_c, var_) \ - do { \ - if (var_ != NULL) \ - var_c = pstrdup(var_); \ - else \ - var_c = NULL; \ - } while (0) - -#define DBLINK_RES_INTERNALERROR(p2) \ - do { \ - msg = pstrdup(PQerrorMessage(conn)); \ - if (res) \ - PQclear(res); \ - elog(ERROR, "%s: %s", p2, msg); \ - } while (0) - -#define DBLINK_CONN_NOT_AVAIL \ - do { \ - if(conname) \ - ereport(ERROR, \ - (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ - errmsg("connection \"%s\" not available", conname))); \ - else \ - ereport(ERROR, \ - (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ - errmsg("connection not available"))); \ - } while (0) - -#define DBLINK_GET_CONN \ - do { \ - char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ - rconn = getConnectionByName(conname_or_str); \ - if (rconn) \ - { \ - conn = rconn->conn; \ - conname = conname_or_str; \ - } \ - else \ - { \ - connstr = get_connect_string(conname_or_str); \ - if (connstr == NULL) \ - { \ - connstr = conname_or_str; \ - } \ - dblink_connstr_check(connstr); \ - conn = PQconnectdb(connstr); \ - if (PQstatus(conn) == CONNECTION_BAD) \ - { \ - msg = pstrdup(PQerrorMessage(conn)); \ - PQfinish(conn); \ - ereport(ERROR, \ - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \ - errmsg("could not establish connection"), \ - errdetail_internal("%s", msg))); \ - } \ - dblink_security_check(conn, rconn); \ - if (PQclientEncoding(conn) != GetDatabaseEncoding()) \ - PQsetClientEncoding(conn, GetDatabaseEncodingName()); \ - freeconn = true; \ - } \ - } while (0) - -#define DBLINK_GET_NAMED_CONN \ - do { \ - conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ - rconn = getConnectionByName(conname); \ - if (rconn) \ - conn = rconn->conn; \ - else \ - DBLINK_CONN_NOT_AVAIL; \ - } while (0) - -#define DBLINK_INIT \ - do { \ - if (!pconn) \ - { \ - pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \ - pconn->conn = NULL; \ - pconn->openCursorCount = 0; \ - pconn->newXactForCursor = FALSE; \ - } \ - } while (0) +static char * +xpstrdup(const char *in) +{ + if (in == NULL) + return NULL; + return pstrdup(in); +} + +static void +pg_attribute_noreturn() +dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2) +{ + char *msg = pchomp(PQerrorMessage(conn)); + + if (res) + PQclear(res); + elog(ERROR, "%s: %s", p2, msg); +} + +static void +pg_attribute_noreturn() +dblink_conn_not_avail(const char *conname) +{ + if (conname) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("connection \"%s\" not available", conname))); + else + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("connection not available"))); +} + +static void +dblink_get_conn(char *conname_or_str, + PGconn *volatile * conn_p, char **conname_p, volatile bool *freeconn_p) +{ + remoteConn *rconn = getConnectionByName(conname_or_str); + PGconn *conn; + char *conname; + bool freeconn; + + if (rconn) + { + conn = rconn->conn; + conname = conname_or_str; + freeconn = false; + } + else + { + const char *connstr; + + connstr = get_connect_string(conname_or_str); + if (connstr == NULL) + connstr = conname_or_str; + dblink_connstr_check(connstr); + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + char *msg = pchomp(PQerrorMessage(conn)); + + PQfinish(conn); + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail_internal("%s", msg))); + } + dblink_security_check(conn, rconn); + if (PQclientEncoding(conn) != GetDatabaseEncoding()) + PQsetClientEncoding(conn, GetDatabaseEncodingName()); + freeconn = true; + conname = NULL; + } + + *conn_p = conn; + *conname_p = conname; + *freeconn_p = freeconn; +} + +static PGconn * +dblink_get_named_conn(const char *conname) +{ + remoteConn *rconn = getConnectionByName(conname); + + if (rconn) + return rconn->conn; + + dblink_conn_not_avail(conname); + return NULL; /* keep compiler quiet */ +} + +static void +dblink_init(void) +{ + if (!pconn) + { + pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); + pconn->conn = NULL; + pconn->openCursorCount = 0; + pconn->newXactForCursor = FALSE; + } +} /* * Create a persistent connection to another database @@ -250,7 +261,7 @@ dblink_connect(PG_FUNCTION_ARGS) PGconn *conn = NULL; remoteConn *rconn = NULL; - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 2) { @@ -275,7 +286,7 @@ dblink_connect(PG_FUNCTION_ARGS) if (PQstatus(conn) == CONNECTION_BAD) { - msg = pstrdup(PQerrorMessage(conn)); + msg = pchomp(PQerrorMessage(conn)); PQfinish(conn); if (rconn) pfree(rconn); @@ -299,7 +310,11 @@ dblink_connect(PG_FUNCTION_ARGS) createNewConnection(connname, rconn); } else + { + if (pconn->conn) + PQfinish(pconn->conn); pconn->conn = conn; + } PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -315,7 +330,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; PGconn *conn = NULL; - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 1) { @@ -328,7 +343,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) conn = pconn->conn; if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); PQfinish(conn); if (rconn) @@ -349,9 +364,8 @@ PG_FUNCTION_INFO_V1(dblink_open); Datum dblink_open(PG_FUNCTION_ARGS) { - char *msg; PGresult *res = NULL; - PGconn *conn = NULL; + PGconn *conn; char *curname = NULL; char *sql = NULL; char *conname = NULL; @@ -359,7 +373,7 @@ dblink_open(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - DBLINK_INIT; + dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 2) @@ -398,16 +412,16 @@ dblink_open(PG_FUNCTION_ARGS) } if (!rconn || !rconn->conn) - DBLINK_CONN_NOT_AVAIL; - else - conn = rconn->conn; + dblink_conn_not_avail(conname); + + conn = rconn->conn; /* If we are not in a transaction, start one */ if (PQtransactionStatus(conn) == PQTRANS_IDLE) { res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("begin error"); + dblink_res_internalerror(conn, res, "begin error"); PQclear(res); rconn->newXactForCursor = TRUE; @@ -427,7 +441,7 @@ dblink_open(PG_FUNCTION_ARGS) res = PQexec(conn, buf.data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - dblink_res_error(conname, res, "could not open cursor", fail); + dblink_res_error(conn, conname, res, "could not open cursor", fail); PG_RETURN_TEXT_P(cstring_to_text("ERROR")); } @@ -442,16 +456,15 @@ PG_FUNCTION_INFO_V1(dblink_close); Datum dblink_close(PG_FUNCTION_ARGS) { - PGconn *conn = NULL; + PGconn *conn; PGresult *res = NULL; char *curname = NULL; char *conname = NULL; StringInfoData buf; - char *msg; remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - DBLINK_INIT; + dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 1) @@ -486,9 +499,9 @@ dblink_close(PG_FUNCTION_ARGS) } if (!rconn || !rconn->conn) - DBLINK_CONN_NOT_AVAIL; - else - conn = rconn->conn; + dblink_conn_not_avail(conname); + + conn = rconn->conn; appendStringInfo(&buf, "CLOSE %s", curname); @@ -496,7 +509,7 @@ dblink_close(PG_FUNCTION_ARGS) res = PQexec(conn, buf.data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - dblink_res_error(conname, res, "could not close cursor", fail); + dblink_res_error(conn, conname, res, "could not close cursor", fail); PG_RETURN_TEXT_P(cstring_to_text("ERROR")); } @@ -514,7 +527,7 @@ dblink_close(PG_FUNCTION_ARGS) res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("commit error"); + dblink_res_internalerror(conn, res, "commit error"); PQclear(res); } } @@ -540,7 +553,7 @@ dblink_fetch(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 4) { @@ -584,7 +597,7 @@ dblink_fetch(PG_FUNCTION_ARGS) } if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); initStringInfo(&buf); appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); @@ -599,7 +612,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - dblink_res_error(conname, res, "could not fetch from cursor", fail); + dblink_res_error(conn, conname, res, + "could not fetch from cursor", fail); return (Datum) 0; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -629,15 +643,13 @@ PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - char *sql = NULL; - remoteConn *rconn = NULL; + PGconn *conn; + char *sql; int retval; if (PG_NARGS() == 2) { - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } else @@ -647,7 +659,7 @@ dblink_send_query(PG_FUNCTION_ARGS) /* async query send */ retval = PQsendQuery(conn, sql); if (retval != 1) - elog(NOTICE, "could not send query: %s", PQerrorMessage(conn)); + elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn))); PG_RETURN_INT32(retval); } @@ -667,15 +679,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); PG_TRY(); { - char *msg; - char *connstr = NULL; char *sql = NULL; char *conname = NULL; - remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ if (!is_async) @@ -683,23 +692,25 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PG_NARGS() == 3) { /* text,text,bool */ - DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); + dblink_get_conn(conname, &conn, &conname, &freeconn); } else if (PG_NARGS() == 2) { /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); + conn = pconn->conn; } else { - DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + dblink_get_conn(conname, &conn, &conname, &freeconn); } } else if (PG_NARGS() == 1) @@ -715,16 +726,18 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else /* is_async */ { /* get async result */ + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + if (PG_NARGS() == 2) { /* text,bool */ - DBLINK_GET_NAMED_CONN; fail = PG_GETARG_BOOL(1); + conn = dblink_get_named_conn(conname); } else if (PG_NARGS() == 1) { /* text */ - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(conname); } else /* shouldn't happen */ @@ -732,7 +745,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); if (!is_async) { @@ -750,8 +763,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK) { - dblink_res_error(conname, res, "could not execute query", - fail); + dblink_res_error(conn, conname, res, + "could not execute query", fail); /* if fail isn't set, we'll return an empty query result */ } else @@ -980,9 +993,7 @@ materializeQueryResult(FunctionCallInfo fcinfo, /* Create short-lived memory context for data conversions */ sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext, "dblink temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + ALLOCSET_DEFAULT_SIZES); /* execute query, collecting any tuples into the tuplestore */ res = storeQueryResult(&sinfo, conn, sql); @@ -998,7 +1009,8 @@ materializeQueryResult(FunctionCallInfo fcinfo, PGresult *res1 = res; res = NULL; - dblink_res_error(conname, res1, "could not execute query", fail); + dblink_res_error(conn, conname, res1, + "could not execute query", fail); /* if fail isn't set, we'll return an empty query result */ } else if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -1084,7 +1096,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql) PGresult *res; if (!PQsendQuery(conn, sql)) - elog(ERROR, "could not send query: %s", PQerrorMessage(conn)); + elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn))); if (!PQsetSingleRowMode(conn)) /* shouldn't fail */ elog(ERROR, "failed to set single-row mode for dblink query"); @@ -1294,12 +1306,10 @@ PG_FUNCTION_INFO_V1(dblink_is_busy); Datum dblink_is_busy(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); PQconsumeInput(conn); PG_RETURN_INT32(PQisBusy(conn)); @@ -1320,15 +1330,13 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res = 0; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + int res; + PGconn *conn; PGcancel *cancel; char errbuf[256]; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); cancel = PQgetCancel(conn); res = PQcancel(cancel, errbuf, 256); @@ -1356,18 +1364,16 @@ Datum dblink_error_message(PG_FUNCTION_ARGS) { char *msg; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); msg = PQerrorMessage(conn); if (msg == NULL || msg[0] == '\0') PG_RETURN_TEXT_P(cstring_to_text("OK")); else - PG_RETURN_TEXT_P(cstring_to_text(msg)); + PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg))); } /* @@ -1381,38 +1387,37 @@ dblink_exec(PG_FUNCTION_ARGS) PGconn *volatile conn = NULL; volatile bool freeconn = false; - DBLINK_INIT; + dblink_init(); PG_TRY(); { - char *msg; PGresult *res = NULL; - char *connstr = NULL; char *sql = NULL; char *conname = NULL; - remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ if (PG_NARGS() == 3) { /* must be text,text,bool */ - DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); + dblink_get_conn(conname, &conn, &conname, &freeconn); } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); + conn = pconn->conn; } else { - DBLINK_GET_CONN; + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + dblink_get_conn(conname, &conn, &conname, &freeconn); } } else if (PG_NARGS() == 1) @@ -1426,14 +1431,15 @@ dblink_exec(PG_FUNCTION_ARGS) elog(ERROR, "wrong number of arguments"); if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - dblink_res_error(conname, res, "could not execute command", fail); + dblink_res_error(conn, conname, res, + "could not execute command", fail); /* * and save a copy of the command status string to return as our @@ -1508,7 +1514,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS) oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); /* open target relation */ - rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT); + rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT); /* get the array of attnums */ results = get_pkey_attnames(rel, &numatts); @@ -1609,7 +1615,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_insert); Datum dblink_build_sql_insert(PG_FUNCTION_ARGS) { - text *relname_text = PG_GETARG_TEXT_P(0); + text *relname_text = PG_GETARG_TEXT_PP(0); int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1); int32 pknumatts_arg = PG_GETARG_INT32(2); ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); @@ -1700,7 +1706,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_delete); Datum dblink_build_sql_delete(PG_FUNCTION_ARGS) { - text *relname_text = PG_GETARG_TEXT_P(0); + text *relname_text = PG_GETARG_TEXT_PP(0); int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1); int32 pknumatts_arg = PG_GETARG_INT32(2); ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); @@ -1777,7 +1783,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_update); Datum dblink_build_sql_update(PG_FUNCTION_ARGS) { - text *relname_text = PG_GETARG_TEXT_P(0); + text *relname_text = PG_GETARG_TEXT_PP(0); int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1); int32 pknumatts_arg = PG_GETARG_INT32(2); ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); @@ -1876,9 +1882,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify); Datum dblink_get_notify(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; PGnotify *notify; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; @@ -1888,9 +1892,9 @@ dblink_get_notify(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 1) - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); else conn = pconn->conn; @@ -2346,7 +2350,7 @@ quote_ident_cstr(char *rawstr) char *result; rawstr_text = cstring_to_text(rawstr); - result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, + result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text))); result = text_to_cstring(result_text); @@ -2664,7 +2668,8 @@ dblink_connstr_check(const char *connstr) } static void -dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail) +dblink_res_error(PGconn *conn, const char *conname, PGresult *res, + const char *dblink_context_msg, bool fail) { int level; char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); @@ -2693,10 +2698,18 @@ dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_ else sqlstate = ERRCODE_CONNECTION_FAILURE; - xpstrdup(message_primary, pg_diag_message_primary); - xpstrdup(message_detail, pg_diag_message_detail); - xpstrdup(message_hint, pg_diag_message_hint); - xpstrdup(message_context, pg_diag_context); + message_primary = xpstrdup(pg_diag_message_primary); + message_detail = xpstrdup(pg_diag_message_detail); + message_hint = xpstrdup(pg_diag_message_hint); + message_context = xpstrdup(pg_diag_context); + + /* + * If we don't get a message from the PGresult, try the PGconn. This is + * needed because for connection-level failures, PQexec may just return + * NULL, not a PGresult at all. + */ + if (message_primary == NULL) + message_primary = pchomp(PQerrorMessage(conn)); if (res) PQclear(res); @@ -2707,7 +2720,7 @@ dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_ ereport(level, (errcode(sqlstate), message_primary ? errmsg_internal("%s", message_primary) : - errmsg("unknown error"), + errmsg("could not obtain message string for remote error"), message_detail ? errdetail_internal("%s", message_detail) : 0, message_hint ? errhint("%s", message_hint) : 0, message_context ? errcontext("%s", message_context) : 0, @@ -2724,11 +2737,32 @@ get_connect_string(const char *servername) ForeignServer *foreign_server = NULL; UserMapping *user_mapping; ListCell *cell; - StringInfo buf = makeStringInfo(); + StringInfoData buf; ForeignDataWrapper *fdw; AclResult aclresult; char *srvname; + static const PQconninfoOption *options = NULL; + + initStringInfo(&buf); + + /* + * Get list of valid libpq options. + * + * To avoid unnecessary work, we get the list once and use it throughout + * the lifetime of this backend process. We don't need to care about + * memory context issues, because PQconndefaults allocates with malloc. + */ + if (!options) + { + options = PQconndefaults(); + if (!options) /* assume reason for failure is OOM */ + ereport(ERROR, + (errcode(ERRCODE_FDW_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("could not get libpq's default connection options"))); + } + /* first gather the server connstr options */ srvname = pstrdup(servername); truncate_identifier(srvname, strlen(srvname), false); @@ -2752,16 +2786,18 @@ get_connect_string(const char *servername) { DefElem *def = lfirst(cell); - appendStringInfo(buf, "%s='%s' ", def->defname, - escape_param_str(strVal(def->arg))); + if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId)) + appendStringInfo(&buf, "%s='%s' ", def->defname, + escape_param_str(strVal(def->arg))); } foreach(cell, foreign_server->options) { DefElem *def = lfirst(cell); - appendStringInfo(buf, "%s='%s' ", def->defname, - escape_param_str(strVal(def->arg))); + if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId)) + appendStringInfo(&buf, "%s='%s' ", def->defname, + escape_param_str(strVal(def->arg))); } foreach(cell, user_mapping->options) @@ -2769,11 +2805,12 @@ get_connect_string(const char *servername) DefElem *def = lfirst(cell); - appendStringInfo(buf, "%s='%s' ", def->defname, - escape_param_str(strVal(def->arg))); + if (is_valid_dblink_option(options, def->defname, UserMappingRelationId)) + appendStringInfo(&buf, "%s='%s' ", def->defname, + escape_param_str(strVal(def->arg))); } - return buf->data; + return buf.data; } else return NULL; @@ -2788,16 +2825,18 @@ static char * escape_param_str(const char *str) { const char *cp; - StringInfo buf = makeStringInfo(); + StringInfoData buf; + + initStringInfo(&buf); for (cp = str; *cp; cp++) { if (*cp == '\\' || *cp == '\'') - appendStringInfoChar(buf, '\\'); - appendStringInfoChar(buf, *cp); + appendStringInfoChar(&buf, '\\'); + appendStringInfoChar(&buf, *cp); } - return buf->data; + return buf.data; } /* |