Generalize parallel slot result handling.
authorRobert Haas <rhaas@postgresql.org>
Fri, 5 Feb 2021 21:08:45 +0000 (16:08 -0500)
committerRobert Haas <rhaas@postgresql.org>
Fri, 5 Feb 2021 21:08:45 +0000 (16:08 -0500)
Instead of having a hard-coded behavior that we ignore missing
tables and report all other errors, let the caller decide what
to do by setting a callback.

Mark Dilger, reviewed and somewhat revised by me. The larger patch
series of which this is a part has also had review from Peter
Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul
Sul, but I don't know whether any of them have reviewed this bit
specifically.

Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com
Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com
Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com

src/bin/scripts/reindexdb.c
src/bin/scripts/vacuumdb.c
src/fe_utils/parallel_slot.c
src/include/fe_utils/parallel_slot.h

index 7781fb1151ab4942ced4f1a5b7466556a10f21f7..9f072ac49aea95f854e28510b44661a4f51c5adf 100644 (file)
@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
                        goto finish;
                }
 
+               ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
                run_reindex_command(free_slot->connection, process_type, objname,
                                                        echo, verbose, concurrently, true);
 
index ed320817bc4feac6e90cac8f2bdaff6572d91810..9dc8aca29f351fb08a2cec8dcfd8a35d8055bc0e 100644 (file)
@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
                 * Execute the vacuum.  All errors are handled in processQueryResult
                 * through ParallelSlotsGetIdle.
                 */
+               ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
                run_vacuum_command(free_slot->connection, sql.data,
                                                   echo, tabname);
 
index 3987a4702b5255a6258d1b4ade4a86cf810b7c23..b625deb2545a12fec6e22e2b913ebe4ce44878f8 100644 (file)
@@ -30,7 +30,7 @@
 
 static void init_slot(ParallelSlot *slot, PGconn *conn);
 static int     select_loop(int maxFd, fd_set *workerset);
-static bool processQueryResult(PGconn *conn, PGresult *result);
+static bool processQueryResult(ParallelSlot *slot, PGresult *result);
 
 static void
 init_slot(ParallelSlot *slot, PGconn *conn)
@@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn)
        slot->connection = conn;
        /* Initially assume connection is idle */
        slot->isFree = true;
+       ParallelSlotClearHandler(slot);
 }
 
 /*
- * Process (and delete) a query result.  Returns true if there's no error,
- * false otherwise -- but errors about trying to work on a missing relation
- * are reported and subsequently ignored.
+ * Process (and delete) a query result.  Returns true if there's no problem,
+ * false otherwise. It's up to the handler to decide what cosntitutes a
+ * problem.
  */
 static bool
-processQueryResult(PGconn *conn, PGresult *result)
+processQueryResult(ParallelSlot *slot, PGresult *result)
 {
-       /*
-        * If it's an error, report it.  Errors about a missing table are harmless
-        * so we continue processing; but die for other errors.
-        */
-       if (PQresultStatus(result) != PGRES_COMMAND_OK)
-       {
-               char       *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+       Assert(slot->handler != NULL);
 
-               pg_log_error("processing of database \"%s\" failed: %s",
-                                        PQdb(conn), PQerrorMessage(conn));
-
-               if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
-               {
-                       PQclear(result);
-                       return false;
-               }
-       }
+       /* On failure, the handler should return NULL after freeing the result */
+       if (!slot->handler(result, slot->connection, slot->handler_context))
+               return false;
 
+       /* Ok, we have to free it ourself */
        PQclear(result);
        return true;
 }
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
  * Note that this will block if the connection is busy.
  */
 static bool
-consumeQueryResult(PGconn *conn)
+consumeQueryResult(ParallelSlot *slot)
 {
        bool            ok = true;
        PGresult   *result;
 
-       SetCancelConn(conn);
-       while ((result = PQgetResult(conn)) != NULL)
+       SetCancelConn(slot->connection);
+       while ((result = PQgetResult(slot->connection)) != NULL)
        {
-               if (!processQueryResult(conn, result))
+               if (!processQueryResult(slot, result))
                        ok = false;
        }
        ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
 
                                if (result != NULL)
                                {
-                                       /* Check and discard the command result */
-                                       if (!processQueryResult(slots[i].connection, result))
+                                       /* Handle and discard the command result */
+                                       if (!processQueryResult(slots + i, result))
                                                return NULL;
                                }
                                else
                                {
                                        /* This connection has become idle */
                                        slots[i].isFree = true;
+                                       ParallelSlotClearHandler(slots + i);
                                        if (firstFree < 0)
                                                firstFree = i;
                                        break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
 
        for (i = 0; i < numslots; i++)
        {
-               if (!consumeQueryResult((slots + i)->connection))
+               if (!consumeQueryResult(slots + i))
+                       return false;
+       }
+
+       return true;
+}
+
+/*
+ * TableCommandResultHandler
+ *
+ * ParallelSlotResultHandler for results of commands (not queries) against
+ * tables.
+ *
+ * Requires that the result status is either PGRES_COMMAND_OK or an error about
+ * a missing table.  This is useful for utilities that compile a list of tables
+ * to process and then run commands (vacuum, reindex, or whatever) against
+ * those tables, as there is a race condition between the time the list is
+ * compiled and the time the command attempts to open the table.
+ *
+ * For missing tables, logs an error but allows processing to continue.
+ *
+ * For all other errors, logs an error and terminates further processing.
+ *
+ * res: PGresult from the query executed on the slot's connection
+ * conn: connection belonging to the slot
+ * context: unused
+ */
+bool
+TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
+{
+       /*
+        * If it's an error, report it.  Errors about a missing table are harmless
+        * so we continue processing; but die for other errors.
+        */
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
+
+               pg_log_error("processing of database \"%s\" failed: %s",
+                                        PQdb(conn), PQerrorMessage(conn));
+
+               if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+               {
+                       PQclear(res);
                        return false;
+               }
        }
 
        return true;
index 99eeb3328d60879cc9185f39de38c689e2ad9e65..8902f8d4f4816a13346a37bbd233d0d6f609e158 100644 (file)
 #include "fe_utils/connect_utils.h"
 #include "libpq-fe.h"
 
+typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
+                                                                                  void *context);
+
 typedef struct ParallelSlot
 {
        PGconn     *connection;         /* One connection */
        bool            isFree;                 /* Is it known to be idle? */
+
+       /*
+        * Prior to issuing a command or query on 'connection', a handler callback
+        * function may optionally be registered to be invoked to process the
+        * results, and context information may optionally be registered for use
+        * by the handler.  If unset, these fields should be NULL.
+        */
+       ParallelSlotResultHandler handler;
+       void       *handler_context;
 } ParallelSlot;
 
+static inline void
+ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
+                                          void *context)
+{
+       slot->handler = handler;
+       slot->handler_context = context;
+}
+
+static inline void
+ParallelSlotClearHandler(ParallelSlot *slot)
+{
+       slot->handler = NULL;
+       slot->handler_context = NULL;
+}
+
 extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
 
 extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
 
 extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
 
+extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
+                                                                         void *context);
 
 #endif                                                 /* PARALLEL_SLOT_H */