Improve connection-failure error handling in contrib/postgres_fdw.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 4 Feb 2014 02:30:02 +0000 (21:30 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 4 Feb 2014 02:30:20 +0000 (21:30 -0500)
postgres_fdw tended to say "unknown error" if it tried to execute a command
on an already-dead connection, because some paths in libpq just return a
null PGresult for such cases.  Out-of-memory might result in that, too.
To fix, pass the PGconn to pgfdw_report_error, and look at its
PQerrorMessage() string if we can't get anything out of the PGresult.

Also, fix the transaction-exit logic to reliably drop a dead connection.
It was attempting to do that already, but it assumed that only connection
cache entries with xact_depth > 0 needed to be examined.  The folly in that
is that if we fail while issuing START TRANSACTION, we'll not have bumped
xact_depth.  (At least for the case I was testing, this fix masks the
other problem; but it still seems like a good idea to have the PGconn
fallback logic.)

Per investigation of bug #9087 from Craig Lucas.  Backpatch to 9.3 where
this code was introduced.

contrib/postgres_fdw/connection.c
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h

index 93ea498012609c1fdba918f3a29d964b14e0baf5..b688241283805d7d0c57375c90ccd610967def5e 100644 (file)
@@ -355,7 +355,7 @@ do_sql_command(PGconn *conn, const char *sql)
 
        res = PQexec(conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(ERROR, res, true, sql);
+               pgfdw_report_error(ERROR, res, conn, true, sql);
        PQclear(res);
 }
 
@@ -454,6 +454,7 @@ GetPrepStmtNumber(PGconn *conn)
  *
  * elevel: error level to use (typically ERROR, but might be less)
  * res: PGresult containing the error
+ * conn: connection we did the query on
  * clear: if true, PQclear the result (otherwise caller will handle it)
  * sql: NULL, or text of remote command we tried to execute
  *
@@ -462,7 +463,8 @@ GetPrepStmtNumber(PGconn *conn)
  * marked with have_error = true.
  */
 void
-pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
+pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+                                  bool clear, const char *sql)
 {
        /* If requested, PGresult must be released before leaving this function. */
        PG_TRY();
@@ -483,6 +485,14 @@ pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
                else
                        sqlstate = ERRCODE_CONNECTION_FAILURE;
 
+               /*
+                * If we don't get a message from the PGresult, try the PGconn.  This
+                * is needed because for connection-level failures, PQexec may just
+                * return NULL, not a PGresult at all.
+                */
+               if (message_primary == NULL)
+                       message_primary = PQerrorMessage(conn);
+
                ereport(elevel,
                                (errcode(sqlstate),
                                 message_primary ? errmsg_internal("%s", message_primary) :
@@ -525,74 +535,37 @@ pgfdw_xact_callback(XactEvent event, void *arg)
        {
                PGresult   *res;
 
-               /* We only care about connections with open remote transactions */
-               if (entry->conn == NULL || entry->xact_depth == 0)
+               /* Ignore cache entry if no open connection right now */
+               if (entry->conn == NULL)
                        continue;
 
-               elog(DEBUG3, "closing remote transaction on connection %p",
-                        entry->conn);
-
-               switch (event)
+               /* If it has an open remote transaction, try to close it */
+               if (entry->xact_depth > 0)
                {
-                       case XACT_EVENT_PRE_COMMIT:
-                               /* Commit all remote transactions during pre-commit */
-                               do_sql_command(entry->conn, "COMMIT TRANSACTION");
-
-                               /*
-                                * If there were any errors in subtransactions, and we made
-                                * prepared statements, do a DEALLOCATE ALL to make sure we
-                                * get rid of all prepared statements.  This is annoying and
-                                * not terribly bulletproof, but it's probably not worth
-                                * trying harder.
-                                *
-                                * DEALLOCATE ALL only exists in 8.3 and later, so this
-                                * constrains how old a server postgres_fdw can communicate
-                                * with.  We intentionally ignore errors in the DEALLOCATE, so
-                                * that we can hobble along to some extent with older servers
-                                * (leaking prepared statements as we go; but we don't really
-                                * support update operations pre-8.3 anyway).
-                                */
-                               if (entry->have_prep_stmt && entry->have_error)
-                               {
-                                       res = PQexec(entry->conn, "DEALLOCATE ALL");
-                                       PQclear(res);
-                               }
-                               entry->have_prep_stmt = false;
-                               entry->have_error = false;
-                               break;
-                       case XACT_EVENT_PRE_PREPARE:
-
-                               /*
-                                * We disallow remote transactions that modified anything,
-                                * since it's not really reasonable to hold them open until
-                                * the prepared transaction is committed.  For the moment,
-                                * throw error unconditionally; later we might allow read-only
-                                * cases.  Note that the error will cause us to come right
-                                * back here with event == XACT_EVENT_ABORT, so we'll clean up
-                                * the connection state at that point.
-                                */
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                                errmsg("cannot prepare a transaction that modified remote tables")));
-                               break;
-                       case XACT_EVENT_COMMIT:
-                       case XACT_EVENT_PREPARE:
-                               /* Should not get here -- pre-commit should have handled it */
-                               elog(ERROR, "missed cleaning up connection during pre-commit");
-                               break;
-                       case XACT_EVENT_ABORT:
-                               /* Assume we might have lost track of prepared statements */
-                               entry->have_error = true;
-                               /* If we're aborting, abort all remote transactions too */
-                               res = PQexec(entry->conn, "ABORT TRANSACTION");
-                               /* Note: can't throw ERROR, it would be infinite loop */
-                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                       pgfdw_report_error(WARNING, res, true,
-                                                                          "ABORT TRANSACTION");
-                               else
-                               {
-                                       PQclear(res);
-                                       /* As above, make sure we've cleared any prepared stmts */
+                       elog(DEBUG3, "closing remote transaction on connection %p",
+                                entry->conn);
+
+                       switch (event)
+                       {
+                               case XACT_EVENT_PRE_COMMIT:
+                                       /* Commit all remote transactions during pre-commit */
+                                       do_sql_command(entry->conn, "COMMIT TRANSACTION");
+
+                                       /*
+                                        * If there were any errors in subtransactions, and we
+                                        * made prepared statements, do a DEALLOCATE ALL to make
+                                        * sure we get rid of all prepared statements. This is
+                                        * annoying and not terribly bulletproof, but it's
+                                        * probably not worth trying harder.
+                                        *
+                                        * DEALLOCATE ALL only exists in 8.3 and later, so this
+                                        * constrains how old a server postgres_fdw can
+                                        * communicate with.  We intentionally ignore errors in
+                                        * the DEALLOCATE, so that we can hobble along to some
+                                        * extent with older servers (leaking prepared statements
+                                        * as we go; but we don't really support update operations
+                                        * pre-8.3 anyway).
+                                        */
                                        if (entry->have_prep_stmt && entry->have_error)
                                        {
                                                res = PQexec(entry->conn, "DEALLOCATE ALL");
@@ -600,8 +573,50 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                                        }
                                        entry->have_prep_stmt = false;
                                        entry->have_error = false;
-                               }
-                               break;
+                                       break;
+                               case XACT_EVENT_PRE_PREPARE:
+
+                                       /*
+                                        * We disallow remote transactions that modified anything,
+                                        * since it's not very reasonable to hold them open until
+                                        * the prepared transaction is committed.  For the moment,
+                                        * throw error unconditionally; later we might allow
+                                        * read-only cases.  Note that the error will cause us to
+                                        * come right back here with event == XACT_EVENT_ABORT, so
+                                        * we'll clean up the connection state at that point.
+                                        */
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("cannot prepare a transaction that modified remote tables")));
+                                       break;
+                               case XACT_EVENT_COMMIT:
+                               case XACT_EVENT_PREPARE:
+                                       /* Pre-commit should have closed the open transaction */
+                                       elog(ERROR, "missed cleaning up connection during pre-commit");
+                                       break;
+                               case XACT_EVENT_ABORT:
+                                       /* Assume we might have lost track of prepared statements */
+                                       entry->have_error = true;
+                                       /* If we're aborting, abort all remote transactions too */
+                                       res = PQexec(entry->conn, "ABORT TRANSACTION");
+                                       /* Note: can't throw ERROR, it would be infinite loop */
+                                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                                               pgfdw_report_error(WARNING, res, entry->conn, true,
+                                                                                  "ABORT TRANSACTION");
+                                       else
+                                       {
+                                               PQclear(res);
+                                               /* As above, make sure to clear any prepared stmts */
+                                               if (entry->have_prep_stmt && entry->have_error)
+                                               {
+                                                       res = PQexec(entry->conn, "DEALLOCATE ALL");
+                                                       PQclear(res);
+                                               }
+                                               entry->have_prep_stmt = false;
+                                               entry->have_error = false;
+                                       }
+                                       break;
+                       }
                }
 
                /* Reset state to show we're out of a transaction */
@@ -689,7 +704,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
                                         curlevel, curlevel);
                        res = PQexec(entry->conn, sql);
                        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                               pgfdw_report_error(WARNING, res, true, sql);
+                               pgfdw_report_error(WARNING, res, entry->conn, true, sql);
                        else
                                PQclear(res);
                }
index ae3ab000e1f34a4a665676d869713b7f396d5dcf..fde1ec13617d80ead676860a57a07b7c648575a4 100644 (file)
@@ -1040,7 +1040,7 @@ postgresReScanForeignScan(ForeignScanState *node)
         */
        res = PQexec(fsstate->conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(ERROR, res, true, sql);
+               pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
        PQclear(res);
 
        /* Now force a fresh FETCH. */
@@ -1374,7 +1374,7 @@ postgresExecForeignInsert(EState *estate,
                                                 0);
        if (PQresultStatus(res) !=
                (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-               pgfdw_report_error(ERROR, res, true, fmstate->query);
+               pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
        /* Check number of rows affected, and fetch RETURNING tuple if any */
        if (fmstate->has_returning)
@@ -1444,7 +1444,7 @@ postgresExecForeignUpdate(EState *estate,
                                                 0);
        if (PQresultStatus(res) !=
                (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-               pgfdw_report_error(ERROR, res, true, fmstate->query);
+               pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
        /* Check number of rows affected, and fetch RETURNING tuple if any */
        if (fmstate->has_returning)
@@ -1514,7 +1514,7 @@ postgresExecForeignDelete(EState *estate,
                                                 0);
        if (PQresultStatus(res) !=
                (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-               pgfdw_report_error(ERROR, res, true, fmstate->query);
+               pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
 
        /* Check number of rows affected, and fetch RETURNING tuple if any */
        if (fmstate->has_returning)
@@ -1563,7 +1563,7 @@ postgresEndForeignModify(EState *estate,
                 */
                res = PQexec(fmstate->conn, sql);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                       pgfdw_report_error(ERROR, res, true, sql);
+                       pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
                PQclear(res);
                fmstate->p_name = NULL;
        }
@@ -1800,7 +1800,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
                 */
                res = PQexec(conn, sql);
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
-                       pgfdw_report_error(ERROR, res, false, sql);
+                       pgfdw_report_error(ERROR, res, conn, false, sql);
 
                /*
                 * Extract cost numbers for topmost plan node.  Note we search for a
@@ -1934,7 +1934,7 @@ create_cursor(ForeignScanState *node)
        res = PQexecParams(conn, buf.data, numParams, NULL, values,
                                           NULL, NULL, 0);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(ERROR, res, true, fsstate->query);
+               pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
        PQclear(res);
 
        /* Mark the cursor as created, and show no tuples have been retrieved */
@@ -1985,7 +1985,7 @@ fetch_more_data(ForeignScanState *node)
                res = PQexec(conn, sql);
                /* On error, report the original query, not the FETCH. */
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
-                       pgfdw_report_error(ERROR, res, false, fsstate->query);
+                       pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
                /* Convert the data into HeapTuples */
                numrows = PQntuples(res);
@@ -2091,7 +2091,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
         */
        res = PQexec(conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(ERROR, res, true, sql);
+               pgfdw_report_error(ERROR, res, conn, true, sql);
        PQclear(res);
 }
 
@@ -2128,7 +2128,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
                                        NULL);
 
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
-               pgfdw_report_error(ERROR, res, true, fmstate->query);
+               pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
        PQclear(res);
 
        /* This action shows that the prepare has been done. */
@@ -2278,7 +2278,7 @@ postgresAnalyzeForeignTable(Relation relation,
        {
                res = PQexec(conn, sql.data);
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
-                       pgfdw_report_error(ERROR, res, false, sql.data);
+                       pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
                if (PQntuples(res) != 1 || PQnfields(res) != 1)
                        elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
@@ -2372,7 +2372,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
        {
                res = PQexec(conn, sql.data);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                       pgfdw_report_error(ERROR, res, false, sql.data);
+                       pgfdw_report_error(ERROR, res, conn, false, sql.data);
                PQclear(res);
                res = NULL;
 
@@ -2403,7 +2403,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
                        res = PQexec(conn, fetch_sql);
                        /* On error, report the original query, not the FETCH. */
                        if (PQresultStatus(res) != PGRES_TUPLES_OK)
-                               pgfdw_report_error(ERROR, res, false, sql.data);
+                               pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
                        /* Process whatever we got. */
                        numrows = PQntuples(res);
index 85fc25aaef4a1bcd0050bd1e679efb1abc2d46ee..228345d78644df7972d95e5d4129e4920d6e6874 100644 (file)
@@ -30,8 +30,8 @@ extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
-extern void pgfdw_report_error(int elevel, PGresult *res, bool clear,
-                                  const char *sql);
+extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
+                                  bool clear, const char *sql);
 
 /* in option.c */
 extern int ExtractConnectionOptions(List *defelems,