summaryrefslogtreecommitdiff
path: root/contrib/dblink/dblink.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r--contrib/dblink/dblink.c431
1 files changed, 235 insertions, 196 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 9c8e308358..a6a3c09ff8 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -9,7 +9,7 @@
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
* contrib/dblink/dblink.c
- * Copyright (c) 2001-2016, PostgreSQL Global Development Group
+ * Copyright (c) 2001-2017, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
@@ -40,6 +40,7 @@
#include "access/reloptions.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
+#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
@@ -58,8 +59,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tqual.h"
-
-#include "dblink.h"
+#include "utils/varlena.h"
PG_MODULE_MAGIC;
@@ -112,7 +112,8 @@ static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclM
static char *generate_relation_name(Relation rel);
static void dblink_connstr_check(const char *connstr);
static void dblink_security_check(PGconn *conn, remoteConn *rconn);
-static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
+static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
+ const char *dblink_context_msg, bool fail);
static char *get_connect_string(const char *servername);
static char *escape_param_str(const char *from);
static void validate_pkattnums(Relation rel,
@@ -143,98 +144,108 @@ typedef struct remoteConnHashEnt
/* initial number of connection hashes */
#define NUMCONN 16
-/* general utility */
-#define xpfree(var_) \
- do { \
- if (var_ != NULL) \
- { \
- pfree(var_); \
- var_ = NULL; \
- } \
- } while (0)
-
-#define xpstrdup(var_c, var_) \
- do { \
- if (var_ != NULL) \
- var_c = pstrdup(var_); \
- else \
- var_c = NULL; \
- } while (0)
-
-#define DBLINK_RES_INTERNALERROR(p2) \
- do { \
- msg = pstrdup(PQerrorMessage(conn)); \
- if (res) \
- PQclear(res); \
- elog(ERROR, "%s: %s", p2, msg); \
- } while (0)
-
-#define DBLINK_CONN_NOT_AVAIL \
- do { \
- if(conname) \
- ereport(ERROR, \
- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
- errmsg("connection \"%s\" not available", conname))); \
- else \
- ereport(ERROR, \
- (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
- errmsg("connection not available"))); \
- } while (0)
-
-#define DBLINK_GET_CONN \
- do { \
- char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
- rconn = getConnectionByName(conname_or_str); \
- if (rconn) \
- { \
- conn = rconn->conn; \
- conname = conname_or_str; \
- } \
- else \
- { \
- connstr = get_connect_string(conname_or_str); \
- if (connstr == NULL) \
- { \
- connstr = conname_or_str; \
- } \
- dblink_connstr_check(connstr); \
- conn = PQconnectdb(connstr); \
- if (PQstatus(conn) == CONNECTION_BAD) \
- { \
- msg = pstrdup(PQerrorMessage(conn)); \
- PQfinish(conn); \
- ereport(ERROR, \
- (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
- errmsg("could not establish connection"), \
- errdetail_internal("%s", msg))); \
- } \
- dblink_security_check(conn, rconn); \
- if (PQclientEncoding(conn) != GetDatabaseEncoding()) \
- PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
- freeconn = true; \
- } \
- } while (0)
-
-#define DBLINK_GET_NAMED_CONN \
- do { \
- conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
- rconn = getConnectionByName(conname); \
- if (rconn) \
- conn = rconn->conn; \
- else \
- DBLINK_CONN_NOT_AVAIL; \
- } while (0)
-
-#define DBLINK_INIT \
- do { \
- if (!pconn) \
- { \
- pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
- pconn->conn = NULL; \
- pconn->openCursorCount = 0; \
- pconn->newXactForCursor = FALSE; \
- } \
- } while (0)
+static char *
+xpstrdup(const char *in)
+{
+ if (in == NULL)
+ return NULL;
+ return pstrdup(in);
+}
+
+static void
+pg_attribute_noreturn()
+dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
+{
+ char *msg = pchomp(PQerrorMessage(conn));
+
+ if (res)
+ PQclear(res);
+ elog(ERROR, "%s: %s", p2, msg);
+}
+
+static void
+pg_attribute_noreturn()
+dblink_conn_not_avail(const char *conname)
+{
+ if (conname)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("connection \"%s\" not available", conname)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+ errmsg("connection not available")));
+}
+
+static void
+dblink_get_conn(char *conname_or_str,
+ PGconn *volatile * conn_p, char **conname_p, volatile bool *freeconn_p)
+{
+ remoteConn *rconn = getConnectionByName(conname_or_str);
+ PGconn *conn;
+ char *conname;
+ bool freeconn;
+
+ if (rconn)
+ {
+ conn = rconn->conn;
+ conname = conname_or_str;
+ freeconn = false;
+ }
+ else
+ {
+ const char *connstr;
+
+ connstr = get_connect_string(conname_or_str);
+ if (connstr == NULL)
+ connstr = conname_or_str;
+ dblink_connstr_check(connstr);
+ conn = PQconnectdb(connstr);
+ if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ char *msg = pchomp(PQerrorMessage(conn));
+
+ PQfinish(conn);
+ ereport(ERROR,
+ (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+ errmsg("could not establish connection"),
+ errdetail_internal("%s", msg)));
+ }
+ dblink_security_check(conn, rconn);
+ if (PQclientEncoding(conn) != GetDatabaseEncoding())
+ PQsetClientEncoding(conn, GetDatabaseEncodingName());
+ freeconn = true;
+ conname = NULL;
+ }
+
+ *conn_p = conn;
+ *conname_p = conname;
+ *freeconn_p = freeconn;
+}
+
+static PGconn *
+dblink_get_named_conn(const char *conname)
+{
+ remoteConn *rconn = getConnectionByName(conname);
+
+ if (rconn)
+ return rconn->conn;
+
+ dblink_conn_not_avail(conname);
+ return NULL; /* keep compiler quiet */
+}
+
+static void
+dblink_init(void)
+{
+ if (!pconn)
+ {
+ pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
+ pconn->conn = NULL;
+ pconn->openCursorCount = 0;
+ pconn->newXactForCursor = FALSE;
+ }
+}
/*
* Create a persistent connection to another database
@@ -250,7 +261,7 @@ dblink_connect(PG_FUNCTION_ARGS)
PGconn *conn = NULL;
remoteConn *rconn = NULL;
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 2)
{
@@ -275,7 +286,7 @@ dblink_connect(PG_FUNCTION_ARGS)
if (PQstatus(conn) == CONNECTION_BAD)
{
- msg = pstrdup(PQerrorMessage(conn));
+ msg = pchomp(PQerrorMessage(conn));
PQfinish(conn);
if (rconn)
pfree(rconn);
@@ -299,7 +310,11 @@ dblink_connect(PG_FUNCTION_ARGS)
createNewConnection(connname, rconn);
}
else
+ {
+ if (pconn->conn)
+ PQfinish(pconn->conn);
pconn->conn = conn;
+ }
PG_RETURN_TEXT_P(cstring_to_text("OK"));
}
@@ -315,7 +330,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL;
PGconn *conn = NULL;
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 1)
{
@@ -328,7 +343,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
conn = pconn->conn;
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
PQfinish(conn);
if (rconn)
@@ -349,9 +364,8 @@ PG_FUNCTION_INFO_V1(dblink_open);
Datum
dblink_open(PG_FUNCTION_ARGS)
{
- char *msg;
PGresult *res = NULL;
- PGconn *conn = NULL;
+ PGconn *conn;
char *curname = NULL;
char *sql = NULL;
char *conname = NULL;
@@ -359,7 +373,7 @@ dblink_open(PG_FUNCTION_ARGS)
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
- DBLINK_INIT;
+ dblink_init();
initStringInfo(&buf);
if (PG_NARGS() == 2)
@@ -398,16 +412,16 @@ dblink_open(PG_FUNCTION_ARGS)
}
if (!rconn || !rconn->conn)
- DBLINK_CONN_NOT_AVAIL;
- else
- conn = rconn->conn;
+ dblink_conn_not_avail(conname);
+
+ conn = rconn->conn;
/* If we are not in a transaction, start one */
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
{
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("begin error");
+ dblink_res_internalerror(conn, res, "begin error");
PQclear(res);
rconn->newXactForCursor = TRUE;
@@ -427,7 +441,7 @@ dblink_open(PG_FUNCTION_ARGS)
res = PQexec(conn, buf.data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
- dblink_res_error(conname, res, "could not open cursor", fail);
+ dblink_res_error(conn, conname, res, "could not open cursor", fail);
PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
}
@@ -442,16 +456,15 @@ PG_FUNCTION_INFO_V1(dblink_close);
Datum
dblink_close(PG_FUNCTION_ARGS)
{
- PGconn *conn = NULL;
+ PGconn *conn;
PGresult *res = NULL;
char *curname = NULL;
char *conname = NULL;
StringInfoData buf;
- char *msg;
remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
- DBLINK_INIT;
+ dblink_init();
initStringInfo(&buf);
if (PG_NARGS() == 1)
@@ -486,9 +499,9 @@ dblink_close(PG_FUNCTION_ARGS)
}
if (!rconn || !rconn->conn)
- DBLINK_CONN_NOT_AVAIL;
- else
- conn = rconn->conn;
+ dblink_conn_not_avail(conname);
+
+ conn = rconn->conn;
appendStringInfo(&buf, "CLOSE %s", curname);
@@ -496,7 +509,7 @@ dblink_close(PG_FUNCTION_ARGS)
res = PQexec(conn, buf.data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
- dblink_res_error(conname, res, "could not close cursor", fail);
+ dblink_res_error(conn, conname, res, "could not close cursor", fail);
PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
}
@@ -514,7 +527,7 @@ dblink_close(PG_FUNCTION_ARGS)
res = PQexec(conn, "COMMIT");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
- DBLINK_RES_INTERNALERROR("commit error");
+ dblink_res_internalerror(conn, res, "commit error");
PQclear(res);
}
}
@@ -540,7 +553,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 4)
{
@@ -584,7 +597,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
}
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
initStringInfo(&buf);
appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
@@ -599,7 +612,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
- dblink_res_error(conname, res, "could not fetch from cursor", fail);
+ dblink_res_error(conn, conname, res,
+ "could not fetch from cursor", fail);
return (Datum) 0;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -629,15 +643,13 @@ PG_FUNCTION_INFO_V1(dblink_send_query);
Datum
dblink_send_query(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- char *sql = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
+ char *sql;
int retval;
if (PG_NARGS() == 2)
{
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
else
@@ -647,7 +659,7 @@ dblink_send_query(PG_FUNCTION_ARGS)
/* async query send */
retval = PQsendQuery(conn, sql);
if (retval != 1)
- elog(NOTICE, "could not send query: %s", PQerrorMessage(conn));
+ elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
PG_RETURN_INT32(retval);
}
@@ -667,15 +679,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
PG_TRY();
{
- char *msg;
- char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
- remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible */
if (!is_async)
@@ -683,23 +692,25 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
if (PG_NARGS() == 3)
{
/* text,text,bool */
- DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
+ dblink_get_conn(conname, &conn, &conname, &freeconn);
}
else if (PG_NARGS() == 2)
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
+ conn = pconn->conn;
}
else
{
- DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ dblink_get_conn(conname, &conn, &conname, &freeconn);
}
}
else if (PG_NARGS() == 1)
@@ -715,16 +726,18 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
else /* is_async */
{
/* get async result */
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
if (PG_NARGS() == 2)
{
/* text,bool */
- DBLINK_GET_NAMED_CONN;
fail = PG_GETARG_BOOL(1);
+ conn = dblink_get_named_conn(conname);
}
else if (PG_NARGS() == 1)
{
/* text */
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(conname);
}
else
/* shouldn't happen */
@@ -732,7 +745,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
}
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
if (!is_async)
{
@@ -750,8 +763,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
if (PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK)
{
- dblink_res_error(conname, res, "could not execute query",
- fail);
+ dblink_res_error(conn, conname, res,
+ "could not execute query", fail);
/* if fail isn't set, we'll return an empty query result */
}
else
@@ -980,9 +993,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
/* Create short-lived memory context for data conversions */
sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
"dblink temporary context",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ ALLOCSET_DEFAULT_SIZES);
/* execute query, collecting any tuples into the tuplestore */
res = storeQueryResult(&sinfo, conn, sql);
@@ -998,7 +1009,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
PGresult *res1 = res;
res = NULL;
- dblink_res_error(conname, res1, "could not execute query", fail);
+ dblink_res_error(conn, conname, res1,
+ "could not execute query", fail);
/* if fail isn't set, we'll return an empty query result */
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -1084,7 +1096,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
PGresult *res;
if (!PQsendQuery(conn, sql))
- elog(ERROR, "could not send query: %s", PQerrorMessage(conn));
+ elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
elog(ERROR, "failed to set single-row mode for dblink query");
@@ -1294,12 +1306,10 @@ PG_FUNCTION_INFO_V1(dblink_is_busy);
Datum
dblink_is_busy(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
PQconsumeInput(conn);
PG_RETURN_INT32(PQisBusy(conn));
@@ -1320,15 +1330,13 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
Datum
dblink_cancel_query(PG_FUNCTION_ARGS)
{
- int res = 0;
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ int res;
+ PGconn *conn;
PGcancel *cancel;
char errbuf[256];
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
cancel = PQgetCancel(conn);
res = PQcancel(cancel, errbuf, 256);
@@ -1356,18 +1364,16 @@ Datum
dblink_error_message(PG_FUNCTION_ARGS)
{
char *msg;
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
- DBLINK_INIT;
- DBLINK_GET_NAMED_CONN;
+ dblink_init();
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
msg = PQerrorMessage(conn);
if (msg == NULL || msg[0] == '\0')
PG_RETURN_TEXT_P(cstring_to_text("OK"));
else
- PG_RETURN_TEXT_P(cstring_to_text(msg));
+ PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
}
/*
@@ -1381,38 +1387,37 @@ dblink_exec(PG_FUNCTION_ARGS)
PGconn *volatile conn = NULL;
volatile bool freeconn = false;
- DBLINK_INIT;
+ dblink_init();
PG_TRY();
{
- char *msg;
PGresult *res = NULL;
- char *connstr = NULL;
char *sql = NULL;
char *conname = NULL;
- remoteConn *rconn = NULL;
bool fail = true; /* default to backward compatible behavior */
if (PG_NARGS() == 3)
{
/* must be text,text,bool */
- DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
fail = PG_GETARG_BOOL(2);
+ dblink_get_conn(conname, &conn, &conname, &freeconn);
}
else if (PG_NARGS() == 2)
{
/* might be text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- conn = pconn->conn;
sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
+ conn = pconn->conn;
}
else
{
- DBLINK_GET_CONN;
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ dblink_get_conn(conname, &conn, &conname, &freeconn);
}
}
else if (PG_NARGS() == 1)
@@ -1426,14 +1431,15 @@ dblink_exec(PG_FUNCTION_ARGS)
elog(ERROR, "wrong number of arguments");
if (!conn)
- DBLINK_CONN_NOT_AVAIL;
+ dblink_conn_not_avail(conname);
res = PQexec(conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
- dblink_res_error(conname, res, "could not execute command", fail);
+ dblink_res_error(conn, conname, res,
+ "could not execute command", fail);
/*
* and save a copy of the command status string to return as our
@@ -1508,7 +1514,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* open target relation */
- rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
+ rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
/* get the array of attnums */
results = get_pkey_attnames(rel, &numatts);
@@ -1609,7 +1615,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
Datum
dblink_build_sql_insert(PG_FUNCTION_ARGS)
{
- text *relname_text = PG_GETARG_TEXT_P(0);
+ text *relname_text = PG_GETARG_TEXT_PP(0);
int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
int32 pknumatts_arg = PG_GETARG_INT32(2);
ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
@@ -1700,7 +1706,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
Datum
dblink_build_sql_delete(PG_FUNCTION_ARGS)
{
- text *relname_text = PG_GETARG_TEXT_P(0);
+ text *relname_text = PG_GETARG_TEXT_PP(0);
int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
int32 pknumatts_arg = PG_GETARG_INT32(2);
ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
@@ -1777,7 +1783,7 @@ PG_FUNCTION_INFO_V1(dblink_build_sql_update);
Datum
dblink_build_sql_update(PG_FUNCTION_ARGS)
{
- text *relname_text = PG_GETARG_TEXT_P(0);
+ text *relname_text = PG_GETARG_TEXT_PP(0);
int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
int32 pknumatts_arg = PG_GETARG_INT32(2);
ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
@@ -1876,9 +1882,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify);
Datum
dblink_get_notify(PG_FUNCTION_ARGS)
{
- char *conname = NULL;
- PGconn *conn = NULL;
- remoteConn *rconn = NULL;
+ PGconn *conn;
PGnotify *notify;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
@@ -1888,9 +1892,9 @@ dblink_get_notify(PG_FUNCTION_ARGS)
prepTuplestoreResult(fcinfo);
- DBLINK_INIT;
+ dblink_init();
if (PG_NARGS() == 1)
- DBLINK_GET_NAMED_CONN;
+ conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
else
conn = pconn->conn;
@@ -2346,7 +2350,7 @@ quote_ident_cstr(char *rawstr)
char *result;
rawstr_text = cstring_to_text(rawstr);
- result_text = DatumGetTextP(DirectFunctionCall1(quote_ident,
+ result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
PointerGetDatum(rawstr_text)));
result = text_to_cstring(result_text);
@@ -2664,7 +2668,8 @@ dblink_connstr_check(const char *connstr)
}
static void
-dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
+dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
+ const char *dblink_context_msg, bool fail)
{
int level;
char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
@@ -2693,10 +2698,18 @@ dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_
else
sqlstate = ERRCODE_CONNECTION_FAILURE;
- xpstrdup(message_primary, pg_diag_message_primary);
- xpstrdup(message_detail, pg_diag_message_detail);
- xpstrdup(message_hint, pg_diag_message_hint);
- xpstrdup(message_context, pg_diag_context);
+ message_primary = xpstrdup(pg_diag_message_primary);
+ message_detail = xpstrdup(pg_diag_message_detail);
+ message_hint = xpstrdup(pg_diag_message_hint);
+ message_context = xpstrdup(pg_diag_context);
+
+ /*
+ * If we don't get a message from the PGresult, try the PGconn. This is
+ * needed because for connection-level failures, PQexec may just return
+ * NULL, not a PGresult at all.
+ */
+ if (message_primary == NULL)
+ message_primary = pchomp(PQerrorMessage(conn));
if (res)
PQclear(res);
@@ -2707,7 +2720,7 @@ dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_
ereport(level,
(errcode(sqlstate),
message_primary ? errmsg_internal("%s", message_primary) :
- errmsg("unknown error"),
+ errmsg("could not obtain message string for remote error"),
message_detail ? errdetail_internal("%s", message_detail) : 0,
message_hint ? errhint("%s", message_hint) : 0,
message_context ? errcontext("%s", message_context) : 0,
@@ -2724,11 +2737,32 @@ get_connect_string(const char *servername)
ForeignServer *foreign_server = NULL;
UserMapping *user_mapping;
ListCell *cell;
- StringInfo buf = makeStringInfo();
+ StringInfoData buf;
ForeignDataWrapper *fdw;
AclResult aclresult;
char *srvname;
+ static const PQconninfoOption *options = NULL;
+
+ initStringInfo(&buf);
+
+ /*
+ * Get list of valid libpq options.
+ *
+ * To avoid unnecessary work, we get the list once and use it throughout
+ * the lifetime of this backend process. We don't need to care about
+ * memory context issues, because PQconndefaults allocates with malloc.
+ */
+ if (!options)
+ {
+ options = PQconndefaults();
+ if (!options) /* assume reason for failure is OOM */
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("could not get libpq's default connection options")));
+ }
+
/* first gather the server connstr options */
srvname = pstrdup(servername);
truncate_identifier(srvname, strlen(srvname), false);
@@ -2752,16 +2786,18 @@ get_connect_string(const char *servername)
{
DefElem *def = lfirst(cell);
- appendStringInfo(buf, "%s='%s' ", def->defname,
- escape_param_str(strVal(def->arg)));
+ if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
+ appendStringInfo(&buf, "%s='%s' ", def->defname,
+ escape_param_str(strVal(def->arg)));
}
foreach(cell, foreign_server->options)
{
DefElem *def = lfirst(cell);
- appendStringInfo(buf, "%s='%s' ", def->defname,
- escape_param_str(strVal(def->arg)));
+ if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
+ appendStringInfo(&buf, "%s='%s' ", def->defname,
+ escape_param_str(strVal(def->arg)));
}
foreach(cell, user_mapping->options)
@@ -2769,11 +2805,12 @@ get_connect_string(const char *servername)
DefElem *def = lfirst(cell);
- appendStringInfo(buf, "%s='%s' ", def->defname,
- escape_param_str(strVal(def->arg)));
+ if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
+ appendStringInfo(&buf, "%s='%s' ", def->defname,
+ escape_param_str(strVal(def->arg)));
}
- return buf->data;
+ return buf.data;
}
else
return NULL;
@@ -2788,16 +2825,18 @@ static char *
escape_param_str(const char *str)
{
const char *cp;
- StringInfo buf = makeStringInfo();
+ StringInfoData buf;
+
+ initStringInfo(&buf);
for (cp = str; *cp; cp++)
{
if (*cp == '\\' || *cp == '\'')
- appendStringInfoChar(buf, '\\');
- appendStringInfoChar(buf, *cp);
+ appendStringInfoChar(&buf, '\\');
+ appendStringInfoChar(&buf, *cp);
}
- return buf->data;
+ return buf.data;
}
/*