summaryrefslogtreecommitdiff
path: root/contrib/dblink/dblink.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r--contrib/dblink/dblink.c276
1 files changed, 276 insertions, 0 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
new file mode 100644
index 00000000000..b3c9c6cbbf2
--- /dev/null
+++ b/contrib/dblink/dblink.c
@@ -0,0 +1,276 @@
+/*
+ * dblink.c
+ *
+ * Functions returning results from a remote database
+ *
+ * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
+ *
+ * Permission to use, copy, modify, and distribute this software and its
+ * documentation for any purpose, without fee, and without a written agreement
+ * is hereby granted, provided that the above copyright notice and this
+ * paragraph and the following two paragraphs appear in all copies.
+ *
+ * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
+ * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
+ * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
+ * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+ * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
+ * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
+ * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+ *
+ */
+
+#include "dblink.h"
+
+PG_FUNCTION_INFO_V1(dblink);
+Datum
+dblink(PG_FUNCTION_ARGS)
+{
+ PGconn *conn = NULL;
+ PGresult *res = NULL;
+ dblink_results *results;
+ char *optstr;
+ char *sqlstatement;
+ char *curstr = "DECLARE mycursor CURSOR FOR ";
+ char *execstatement;
+ char *msg;
+ int ntuples = 0;
+ ReturnSetInfo *rsi;
+
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
+ elog(ERROR, "dblink: NULL arguments are not permitted");
+ }
+
+ if (fcinfo->resultinfo == NULL || ! IsA(fcinfo->resultinfo, ReturnSetInfo)) {
+ elog(ERROR, "dblink: function called in context that does not accept a set result");
+ }
+
+ optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
+ sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
+
+ if (fcinfo->flinfo->fn_extra == NULL) {
+
+ conn = PQconnectdb(optstr);
+ if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ msg = PQerrorMessage(conn);
+ PQfinish(conn);
+ elog(ERROR, "dblink: connection error: %s", msg);
+ }
+
+ res = PQexec(conn, "BEGIN");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ msg = PQerrorMessage(conn);
+ PQclear(res);
+ PQfinish(conn);
+ elog(ERROR, "dblink: begin error: %s", msg);
+ }
+ PQclear(res);
+
+ execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
+ if (execstatement != NULL) {
+ strcpy(execstatement, curstr);
+ strcat(execstatement, sqlstatement);
+ strcat(execstatement, "\0");
+ } else {
+ elog(ERROR, "dblink: insufficient memory" );
+ }
+
+ res = PQexec(conn, execstatement);
+ if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ msg = PQerrorMessage(conn);
+ PQclear(res);
+ PQfinish(conn);
+ elog(ERROR, "dblink: sql error: %s", msg);
+ } else {
+ /*
+ * got results, start fetching them
+ */
+ PQclear(res);
+
+ res = PQexec(conn, "FETCH ALL in mycursor");
+ if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) {
+ msg = PQerrorMessage(conn);
+ PQclear(res);
+ PQfinish(conn);
+ elog(ERROR, "dblink: sql error: %s", msg);
+ }
+
+ ntuples = PQntuples(res);
+
+ if (ntuples > 0) {
+
+ results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
+ results->tup_num = 0;
+ results->res = res;
+ res = NULL;
+
+ (dblink_results *) fcinfo->flinfo->fn_extra = results;
+
+ results = NULL;
+ results = fcinfo->flinfo->fn_extra;
+
+ /* close the cursor */
+ res = PQexec(conn, "CLOSE mycursor");
+ PQclear(res);
+
+ /* commit the transaction */
+ res = PQexec(conn, "COMMIT");
+ PQclear(res);
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+
+ rsi = (ReturnSetInfo *)fcinfo->resultinfo;
+ rsi->isDone = ExprMultipleResult;
+
+ PG_RETURN_POINTER(results);
+
+ } else {
+
+ PQclear(res);
+
+ /* close the cursor */
+ res = PQexec(conn, "CLOSE mycursor");
+ PQclear(res);
+
+ /* commit the transaction */
+ res = PQexec(conn, "COMMIT");
+ PQclear(res);
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+
+ rsi = (ReturnSetInfo *)fcinfo->resultinfo;
+ rsi->isDone = ExprEndResult ;
+
+ PG_RETURN_NULL();
+ }
+ }
+ } else {
+ /*
+ * check for more results
+ */
+
+ results = fcinfo->flinfo->fn_extra;
+ results->tup_num++;
+ ntuples = PQntuples(results->res);
+
+ if (results->tup_num < ntuples) {
+
+ /*
+ * fetch them if available
+ */
+
+ rsi = (ReturnSetInfo *)fcinfo->resultinfo;
+ rsi->isDone = ExprMultipleResult;
+
+ PG_RETURN_POINTER(results);
+
+ } else {
+
+ /*
+ * or if no more, clean things up
+ */
+
+ results = fcinfo->flinfo->fn_extra;
+
+ PQclear(results->res);
+
+ rsi = (ReturnSetInfo *)fcinfo->resultinfo;
+ rsi->isDone = ExprEndResult ;
+
+ PG_RETURN_NULL();
+ }
+ }
+ PG_RETURN_NULL();
+}
+
+
+/*
+ * dblink_tok
+ * parse dblink output string
+ * return fldnum item (0 based)
+ * based on provided field separator
+ */
+
+PG_FUNCTION_INFO_V1(dblink_tok);
+Datum
+dblink_tok(PG_FUNCTION_ARGS)
+{
+ dblink_results *results;
+ int fldnum;
+ text *result_text;
+ char *result;
+ int nfields = 0;
+ int text_len = 0;
+
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
+ elog(ERROR, "dblink: NULL arguments are not permitted");
+ }
+
+ results = (dblink_results *) PG_GETARG_POINTER(0);
+ if (results == NULL) {
+ elog(ERROR, "dblink: function called with invalid result pointer");
+ }
+
+ fldnum = PG_GETARG_INT32(1);
+ if (fldnum < 0) {
+ elog(ERROR, "dblink: field number < 0 not permitted");
+ }
+
+ nfields = PQnfields(results->res);
+ if (fldnum > (nfields - 1)) {
+ elog(ERROR, "dblink: field number %d does not exist", fldnum);
+ }
+
+ text_len = PQgetlength(results->res, results->tup_num, fldnum);
+
+ result = (char *) palloc(text_len + 1);
+
+ if (result != NULL) {
+ strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum));
+ strcat(result, "\0");
+ } else {
+ elog(ERROR, "dblink: insufficient memory" );
+ }
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
+
+ PG_RETURN_TEXT_P(result_text);
+}
+
+
+/*
+ * internal functions
+ */
+
+
+/*
+ * init_dblink_results
+ * - create an empty dblink_results data structure
+ */
+dblink_results *
+init_dblink_results(MemoryContext fn_mcxt)
+{
+ MemoryContext oldcontext;
+ dblink_results *retval;
+
+ oldcontext = MemoryContextSwitchTo(fn_mcxt);
+
+ retval = (dblink_results *) palloc(sizeof(dblink_results));
+ MemSet(retval, 0, sizeof(dblink_results));
+
+ retval->tup_num = -1;
+ retval->res = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return retval;
+}