/* Parallel vacuuming stuff */
typedef struct ParallelSlot
{
- PGconn *connection;
- pgsocket sock;
- bool isFree;
+ PGconn *connection; /* One connection */
+ bool isFree; /* Is it known to be idle? */
} ParallelSlot;
/* vacuum options controlled by user flags */
static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
const char *progname);
+static bool ProcessQueryResult(PGconn *conn, PGresult *result,
+ const char *progname);
+
static bool GetQueryResult(PGconn *conn, const char *progname);
static void DisconnectDatabase(ParallelSlot *slot);
static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
-static void init_slot(ParallelSlot *slot, PGconn *conn, const char *progname);
+static void init_slot(ParallelSlot *slot, PGconn *conn);
static void help(const char *progname);
PQExpBufferData sql;
PGconn *conn;
SimpleStringListCell *cell;
- ParallelSlot *slots = NULL;
+ ParallelSlot *slots;
SimpleStringList dbtables = {NULL, NULL};
int i;
bool failed = false;
PQExpBufferData buf;
PGresult *res;
int ntups;
- int i;
initPQExpBuffer(&buf);
* for the first slot. If not in parallel mode, the first slot in the
* array contains the connection.
*/
+ if (concurrentCons <= 0)
+ concurrentCons = 1;
slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
- init_slot(slots, conn, progname);
+ init_slot(slots, conn);
if (parallel)
{
for (i = 1; i < concurrentCons; i++)
{
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, echo, false, true);
- init_slot(slots + i, conn, progname);
+ init_slot(slots + i, conn);
}
}
cell = tables ? tables->head : NULL;
do
{
- ParallelSlot *free_slot;
const char *tabname = cell ? cell->val : NULL;
-
- prepare_vacuum_command(&sql, conn, vacopts, tabname,
- tables == &dbtables, progname, echo);
+ ParallelSlot *free_slot;
if (CancelRequested)
{
else
free_slot = slots;
+ /*
+ * Prepare the vacuum command. Note that in some cases this requires
+ * query execution, so be sure to use the free connection.
+ */
+ prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
+ tables == &dbtables, progname, echo);
+
/*
* Execute the vacuum. If not in parallel mode, this terminates the
* program in case of an error. (The parallel case handles query
- * errors in GetQueryResult through GetIdleSlot.)
+ * errors in ProcessQueryResult through GetIdleSlot.)
*/
run_vacuum_command(free_slot->connection, sql.data,
echo, tabname, progname, parallel);
{
int j;
+ /* wait for all connections to finish */
for (j = 0; j < concurrentCons; j++)
{
- /* wait for all connection to return the results */
if (!GetQueryResult((slots + j)->connection, progname))
goto finish;
-
- (slots + j)->isFree = true;
}
}
}
/*
- * Execute a vacuum/analyze command to the server.
+ * Send a vacuum/analyze command to the server. In async mode, return after
+ * sending the command; else, wait for it to finish.
*
* Any errors during command execution are reported to stderr. If async is
* false, this function exits the program after reporting the error.
* this happens, we read the whole set and mark as free all sockets that become
* available.
*
- * Process the slot list, if any free slot is available then return the slotid
- * else perform the select on all the socket's and wait until at least one slot
- * becomes available.
- *
* If an error occurs, NULL is returned.
*/
static ParallelSlot *
{
int i;
int firstFree = -1;
- fd_set slotset;
- pgsocket maxFd;
-
- for (i = 0; i < numslots; i++)
- if ((slots + i)->isFree)
- return slots + i;
-
- FD_ZERO(&slotset);
- maxFd = slots->sock;
+ /* Any connection already known free? */
for (i = 0; i < numslots; i++)
{
- FD_SET((slots + i)->sock, &slotset);
- if ((slots + i)->sock > maxFd)
- maxFd = (slots + i)->sock;
+ if (slots[i].isFree)
+ return slots + i;
}
/*
* No free slot found, so wait until one of the connections has finished
* its task and return the available slot.
*/
- for (firstFree = -1; firstFree < 0;)
+ while (firstFree < 0)
{
+ fd_set slotset;
+ int maxFd = 0;
bool aborting;
+ /* We must reconstruct the fd_set for each call to select_loop */
+ FD_ZERO(&slotset);
+
+ for (i = 0; i < numslots; i++)
+ {
+ int sock = PQsocket(slots[i].connection);
+
+ /*
+ * We don't really expect any connections to lose their sockets
+ * after startup, but just in case, cope by ignoring them.
+ */
+ if (sock < 0)
+ continue;
+
+ FD_SET(sock, &slotset);
+ if (sock > maxFd)
+ maxFd = sock;
+ }
+
SetCancelConn(slots->connection);
i = select_loop(maxFd, &slotset, &aborting);
ResetCancelConn();
for (i = 0; i < numslots; i++)
{
- if (!FD_ISSET((slots + i)->sock, &slotset))
- continue;
-
- PQconsumeInput((slots + i)->connection);
- if (PQisBusy((slots + i)->connection))
- continue;
+ int sock = PQsocket(slots[i].connection);
- (slots + i)->isFree = true;
+ if (sock >= 0 && FD_ISSET(sock, &slotset))
+ {
+ /* select() says input is available, so consume it */
+ PQconsumeInput(slots[i].connection);
+ }
- if (!GetQueryResult((slots + i)->connection, progname))
- return NULL;
+ /* Collect result(s) as long as any are available */
+ while (!PQisBusy(slots[i].connection))
+ {
+ PGresult *result = PQgetResult(slots[i].connection);
- if (firstFree < 0)
- firstFree = i;
+ if (result != NULL)
+ {
+ /* Check and discard the command result */
+ if (!ProcessQueryResult(slots[i].connection, result,
+ progname))
+ return NULL;
+ }
+ else
+ {
+ /* This connection has become idle */
+ slots[i].isFree = true;
+ if (firstFree < 0)
+ firstFree = i;
+ break;
+ }
+ }
}
}
return slots + firstFree;
}
+/*
+ * ProcessQueryResult
+ *
+ * Process (and delete) a query result. Returns true if there's no error,
+ * false otherwise -- but errors about trying to vacuum a missing relation
+ * are reported and subsequently ignored.
+ */
+static bool
+ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
+{
+ /*
+ * If it's an error, report it. Errors about a missing table are harmless
+ * so we continue processing; but die for other errors.
+ */
+ if (PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+ fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+ progname, PQdb(conn), PQerrorMessage(conn));
+
+ if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
+ {
+ PQclear(result);
+ return false;
+ }
+ }
+
+ PQclear(result);
+ return true;
+}
+
/*
* GetQueryResult
*
- * Process the query result. Returns true if there's no error, false
- * otherwise -- but errors about trying to vacuum a missing relation are
- * reported and subsequently ignored.
+ * Pump the conn till it's dry of results; return false if any are errors.
+ * Note that this will block if the conn is busy.
*/
static bool
GetQueryResult(PGconn *conn, const char *progname)
{
+ bool ok = true;
PGresult *result;
SetCancelConn(conn);
while ((result = PQgetResult(conn)) != NULL)
{
- /*
- * If errors are found, report them. Errors about a missing table are
- * harmless so we continue processing; but die for other errors.
- */
- if (PQresultStatus(result) != PGRES_COMMAND_OK)
- {
- char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
-
- fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
- progname, PQdb(conn), PQerrorMessage(conn));
-
- if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
- {
- PQclear(result);
- return false;
- }
- }
-
- PQclear(result);
+ if (!ProcessQueryResult(conn, result, progname))
+ ok = false;
}
ResetCancelConn();
-
- return true;
+ return ok;
}
/*
}
static void
-init_slot(ParallelSlot *slot, PGconn *conn, const char *progname)
+init_slot(ParallelSlot *slot, PGconn *conn)
{
slot->connection = conn;
+ /* Initially assume connection is idle */
slot->isFree = true;
- slot->sock = PQsocket(conn);
-
- if (slot->sock < 0)
- {
- fprintf(stderr, _("%s: invalid socket: %s"), progname,
- PQerrorMessage(conn));
- exit(1);
- }
}
static void