#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
+#define PARALLEL_VACUUM_KEY_INDEX_STATS 6
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
Oid relid;
int elevel;
- /*
- * An indication for vacuum workers to perform either index vacuum or
- * index cleanup. first_time is true only if for_cleanup is true and
- * bulk-deletion is not performed yet.
- */
- bool for_cleanup;
- bool first_time;
-
/*
* Fields for both index vacuum and cleanup.
*
*/
pg_atomic_uint32 active_nworkers;
- /*
- * Variables to control parallel vacuum. We have a bitmap to indicate
- * which index has stats in shared memory. The set bit in the map
- * indicates that the particular index supports a parallel vacuum.
- */
- pg_atomic_uint32 idx; /* counter for vacuuming and clean up */
- uint32 offset; /* sizeof header incl. bitmap */
- bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */
-
- /* Shared index statistics data follows at end of struct */
+ /* Counter for vacuuming and cleanup */
+ pg_atomic_uint32 idx;
} LVShared;
-#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
-#define GetSharedIndStats(s) \
- ((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
-#define IndStatsIsNull(s, i) \
- (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
+/* Status used during parallel index vacuum or cleanup */
+typedef enum LVParallelIndVacStatus
+{
+ PARALLEL_INDVAC_STATUS_INITIAL = 0,
+ PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
+ PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
+ PARALLEL_INDVAC_STATUS_COMPLETED
+} LVParallelIndVacStatus;
/*
- * Struct for an index bulk-deletion statistic used for parallel vacuum. This
- * is allocated in the DSM segment.
+ * Struct for index vacuum statistics of an index that is used for parallel vacuum.
+ * This includes the status of parallel index vacuum as well as index statistics.
*/
-typedef struct LVSharedIndStats
+typedef struct LVParallelIndStats
{
- bool updated; /* are the stats updated? */
+ /*
+ * The following two fields are set by leader process before executing
+ * parallel index vacuum or parallel index cleanup. These fields are not
+ * fixed for the entire VACUUM operation. They are only fixed for an
+ * individual parallel index vacuum and cleanup.
+ *
+ * parallel_workers_can_process is true if both leader and worker can
+ * process the index, otherwise only leader can process it.
+ */
+ LVParallelIndVacStatus status;
+ bool parallel_workers_can_process;
+
+ /*
+ * Individual worker or leader stores the result of index vacuum or
+ * cleanup.
+ */
+ bool istat_updated; /* are the stats updated? */
IndexBulkDeleteResult istat;
-} LVSharedIndStats;
+} LVParallelIndStats;
/* Struct for maintaining a parallel vacuum state. */
typedef struct LVParallelState
/* Shared information among parallel vacuum workers */
LVShared *lvshared;
+ /*
+ * Shared index statistics among parallel vacuum workers. The array
+ * element is allocated for every index, even those indexes where parallel
+ * index vacuuming is unsafe or not worthwhile (e.g.,
+ * will_parallel_vacuum[] is false). During parallel vacuum,
+ * IndexBulkDeleteResult of each index is kept in DSM and is copied into
+ * local memory at the end of parallel vacuum.
+ */
+ LVParallelIndStats *lvpindstats;
+
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
/* Points to WAL usage area in DSM */
WalUsage *wal_usage;
+ /*
+ * False if the index is totally unsuitable target for all parallel
+ * processing. For example, the index could be <
+ * min_parallel_index_scan_size cutoff.
+ */
+ bool *will_parallel_vacuum;
+
/*
* The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively.
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
LVRelState *vacrel);
static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
-static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel);
-static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel);
-static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers);
-static void do_parallel_processing(LVRelState *vacrel,
- LVShared *lvshared);
-static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel,
- LVShared *lvshared);
-static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel,
- IndexBulkDeleteResult *istat,
- LVShared *lvshared,
- LVSharedIndStats *shared_indstats,
- LVRelState *vacrel);
static void lazy_cleanup_all_indexes(LVRelState *vacrel);
+static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum);
+static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
+ LVParallelIndStats *pindstats);
+static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel);
+static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
+ LVShared *shared,
+ LVParallelIndStats *pindstats);
static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
IndexBulkDeleteResult *istat,
double reltuples,
static int vac_cmp_itemptr(const void *left, const void *right);
static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
-static int compute_parallel_vacuum_workers(LVRelState *vacrel,
- int nrequested,
+static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
bool *will_parallel_vacuum);
static void update_index_statistics(LVRelState *vacrel);
-static void begin_parallel_vacuum(LVRelState *vacrel, int nrequested);
-static void end_parallel_vacuum(LVRelState *vacrel);
-static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx);
-static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared);
+static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested);
+static void parallel_vacuum_end(LVRelState *vacrel);
+static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
+ bool vacuum);
static void vacuum_error_callback(void *arg);
static void update_vacuum_error_info(LVRelState *vacrel,
LVSavedErrInfo *saved_vacrel,
else
{
/* Outsource everything to parallel variant */
- do_parallel_lazy_vacuum_all_indexes(vacrel);
+ parallel_vacuum_process_all_indexes(vacrel, true);
/*
* Do a postcheck to consider applying wraparound failsafe now. Note
}
/*
- * Perform lazy_vacuum_all_indexes() steps in parallel
+ * Perform index vacuum or index cleanup with parallel workers. This function
+ * must be used by the parallel vacuum leader process.
*/
static void
-do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel)
+parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum)
{
- /* Tell parallel workers to do index vacuuming */
- vacrel->lps->lvshared->for_cleanup = false;
- vacrel->lps->lvshared->first_time = false;
-
- /*
- * We can only provide an approximate value of num_heap_tuples, at least
- * for now. Matches serial VACUUM case.
- */
- vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
- vacrel->lps->lvshared->estimated_count = true;
+ LVParallelState *lps = vacrel->lps;
+ LVParallelIndVacStatus new_status;
+ int nworkers;
- do_parallel_vacuum_or_cleanup(vacrel,
- vacrel->lps->nindexes_parallel_bulkdel);
-}
+ Assert(!IsParallelWorker());
+ Assert(ParallelVacuumIsActive(vacrel));
+ Assert(vacrel->nindexes > 0);
-/*
- * Perform lazy_cleanup_all_indexes() steps in parallel
- */
-static void
-do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel)
-{
- int nworkers;
+ if (vacuum)
+ {
+ /*
+ * We can only provide an approximate value of num_heap_tuples, at
+ * least for now. Matches serial VACUUM case.
+ */
+ vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
+ vacrel->lps->lvshared->estimated_count = true;
- /*
- * If parallel vacuum is active we perform index cleanup with parallel
- * workers.
- *
- * Tell parallel workers to do index cleanup.
- */
- vacrel->lps->lvshared->for_cleanup = true;
- vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0);
+ new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
- /*
- * Now we can provide a better estimate of total number of surviving
- * tuples (we assume indexes are more interested in that than in the
- * number of nominally live tuples).
- */
- vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
- vacrel->lps->lvshared->estimated_count =
- (vacrel->tupcount_pages < vacrel->rel_pages);
-
- /* Determine the number of parallel workers to launch */
- if (vacrel->lps->lvshared->first_time)
- nworkers = vacrel->lps->nindexes_parallel_cleanup +
- vacrel->lps->nindexes_parallel_condcleanup;
+ /* Determine the number of parallel workers to launch */
+ nworkers = vacrel->lps->nindexes_parallel_bulkdel;
+ }
else
- nworkers = vacrel->lps->nindexes_parallel_cleanup;
+ {
+ /*
+ * We can provide a better estimate of total number of surviving
+ * tuples (we assume indexes are more interested in that than in the
+ * number of nominally live tuples).
+ */
+ vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
+ vacrel->lps->lvshared->estimated_count =
+ (vacrel->tupcount_pages < vacrel->rel_pages);
- do_parallel_vacuum_or_cleanup(vacrel, nworkers);
-}
+ new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
-/*
- * Perform index vacuum or index cleanup with parallel workers. This function
- * must be used by the parallel vacuum leader process. The caller must set
- * lps->lvshared->for_cleanup to indicate whether to perform vacuum or
- * cleanup.
- */
-static void
-do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
-{
- LVParallelState *lps = vacrel->lps;
+ /* Determine the number of parallel workers to launch */
+ nworkers = vacrel->lps->nindexes_parallel_cleanup;
- Assert(!IsParallelWorker());
- Assert(ParallelVacuumIsActive(vacrel));
- Assert(vacrel->nindexes > 0);
+ /* Add conditionally parallel-aware indexes if in the first time call */
+ if (vacrel->num_index_scans == 0)
+ nworkers += vacrel->lps->nindexes_parallel_condcleanup;
+ }
/* The leader process will participate */
nworkers--;
/*
* It is possible that parallel context is initialized with fewer workers
* than the number of indexes that need a separate worker in the current
- * phase, so we need to consider it. See compute_parallel_vacuum_workers.
+ * phase, so we need to consider it. See
+ * parallel_vacuum_compute_workers().
*/
nworkers = Min(nworkers, lps->pcxt->nworkers);
+ /*
+ * Set index vacuum status and mark whether parallel vacuum worker can
+ * process it.
+ */
+ for (int i = 0; i < vacrel->nindexes; i++)
+ {
+ LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]);
+
+ Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
+ pindstats->status = new_status;
+ pindstats->parallel_workers_can_process =
+ (lps->will_parallel_vacuum[i] &
+ parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i],
+ vacuum));
+ }
+
+ /* Reset the parallel index processing counter */
+ pg_atomic_write_u32(&(lps->lvshared->idx), 0);
+
/* Setup the shared cost-based vacuum delay and launch workers */
if (nworkers > 0)
{
+ /* Reinitialize parallel context to relaunch parallel workers */
if (vacrel->num_index_scans > 0)
- {
- /* Reset the parallel index processing counter */
- pg_atomic_write_u32(&(lps->lvshared->idx), 0);
-
- /* Reinitialize the parallel context to relaunch parallel workers */
ReinitializeParallelDSM(lps->pcxt);
- }
/*
* Set up shared cost balance and the number of active workers for
VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
}
- if (lps->lvshared->for_cleanup)
+ if (vacuum)
ereport(elevel,
- (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
- "launched %d parallel vacuum workers for index cleanup (planned: %d)",
+ (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
+ "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
else
ereport(elevel,
- (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
- "launched %d parallel vacuum workers for index vacuuming (planned: %d)",
+ (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
+ "launched %d parallel vacuum workers for index cleanup (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
}
/* Process the indexes that can be processed by only leader process */
- do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared);
+ parallel_vacuum_process_unsafe_indexes(vacrel);
/*
- * Join as a parallel worker. The leader process alone processes all the
- * indexes in the case where no workers are launched.
+ * Join as a parallel worker. The leader process alone processes all
+ * parallel-safe indexes in the case where no workers are launched.
*/
- do_parallel_processing(vacrel, lps->lvshared);
+ parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats);
/*
* Next, accumulate buffer and WAL usage. (This must wait for the workers
InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
}
+ /*
+ * Reset all index status back to initial (while checking that we have
+ * processed all indexes).
+ */
+ for (int i = 0; i < vacrel->nindexes; i++)
+ {
+ LVParallelIndStats *pindstats = &(lps->lvpindstats[i]);
+
+ if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
+ elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
+ RelationGetRelationName(vacrel->indrels[i]));
+
+ pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
+ }
+
/*
* Carry the shared balance value to heap scan and disable shared costing
*/
* vacuum worker processes to process the indexes in parallel.
*/
static void
-do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
+parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
+ LVParallelIndStats *pindstats)
{
/*
* Increment the active worker count if we are able to launch any worker.
for (;;)
{
int idx;
- LVSharedIndStats *shared_istat;
- Relation indrel;
- IndexBulkDeleteResult *istat;
+ LVParallelIndStats *pis;
/* Get an index number to process */
- idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
+ idx = pg_atomic_fetch_add_u32(&(shared->idx), 1);
/* Done for all indexes? */
if (idx >= vacrel->nindexes)
break;
- /* Get the index statistics space from DSM, if any */
- shared_istat = parallel_stats_for_idx(lvshared, idx);
-
- /* Skip indexes not participating in parallelism */
- if (shared_istat == NULL)
- continue;
-
- indrel = vacrel->indrels[idx];
+ pis = &(pindstats[idx]);
/*
- * Skip processing indexes that are unsafe for workers (these are
- * processed in do_serial_processing_for_unsafe_indexes() by leader)
+ * Skip processing index that is unsafe for workers or has an
+ * unsuitable target for parallel index vacuum (this is processed in
+ * parallel_vacuum_process_unsafe_indexes() by the leader).
*/
- if (!parallel_processing_is_safe(indrel, lvshared))
+ if (!pis->parallel_workers_can_process)
continue;
/* Do vacuum or cleanup of the index */
- istat = vacrel->indstats[idx];
- vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
- lvshared,
- shared_istat,
- vacrel);
+ parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
+ shared, pis);
}
/*
*
* Handles index vacuuming (or index cleanup) for indexes that are not
* parallel safe. It's possible that this will vary for a given index, based
- * on details like whether we're performing for_cleanup processing right now.
+ * on details like whether we're performing index cleanup right now.
*
* Also performs processing of smaller indexes that fell under the size cutoff
- * enforced by compute_parallel_vacuum_workers(). These indexes never get a
- * slot for statistics in DSM.
+ * enforced by parallel_vacuum_compute_workers().
*/
static void
-do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
+parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel)
{
+ LVParallelState *lps = vacrel->lps;
+
Assert(!IsParallelWorker());
/*
for (int idx = 0; idx < vacrel->nindexes; idx++)
{
- LVSharedIndStats *shared_istat;
- Relation indrel;
- IndexBulkDeleteResult *istat;
-
- shared_istat = parallel_stats_for_idx(lvshared, idx);
- indrel = vacrel->indrels[idx];
+ LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
- /*
- * We're only here for the indexes that parallel workers won't
- * process. Note that the shared_istat test ensures that we process
- * indexes that fell under initial size cutoff.
- */
- if (shared_istat != NULL &&
- parallel_processing_is_safe(indrel, lvshared))
+ /* Skip, indexes that are safe for workers */
+ if (pindstats->parallel_workers_can_process)
continue;
/* Do vacuum or cleanup of the index */
- istat = vacrel->indstats[idx];
- vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
- lvshared,
- shared_istat,
- vacrel);
+ parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
+ lps->lvshared, pindstats);
}
/*
* statistics returned from ambulkdelete and amvacuumcleanup to the DSM
* segment.
*/
-static IndexBulkDeleteResult *
-parallel_process_one_index(Relation indrel,
- IndexBulkDeleteResult *istat,
- LVShared *lvshared,
- LVSharedIndStats *shared_istat,
- LVRelState *vacrel)
+static void
+parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
+ LVShared *shared, LVParallelIndStats *pindstats)
{
+ IndexBulkDeleteResult *istat = NULL;
IndexBulkDeleteResult *istat_res;
/*
* Update the pointer to the corresponding bulk-deletion result if someone
* has already updated it
*/
- if (shared_istat && shared_istat->updated && istat == NULL)
- istat = &shared_istat->istat;
+ if (pindstats->istat_updated)
+ istat = &(pindstats->istat);
- /* Do vacuum or cleanup of the index */
- if (lvshared->for_cleanup)
- istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples,
- lvshared->estimated_count, vacrel);
- else
- istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples,
- vacrel);
+ switch (pindstats->status)
+ {
+ case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
+ istat_res = lazy_vacuum_one_index(indrel, istat,
+ shared->reltuples, vacrel);
+ break;
+ case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
+ istat_res = lazy_cleanup_one_index(indrel, istat,
+ shared->reltuples,
+ shared->estimated_count,
+ vacrel);
+ break;
+ default:
+ elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
+ pindstats->status,
+ RelationGetRelationName(indrel));
+ }
/*
* Copy the index bulk-deletion result returned from ambulkdelete and
* Since all vacuum workers write the bulk-deletion result at different
* slots we can write them without locking.
*/
- if (shared_istat && !shared_istat->updated && istat_res != NULL)
+ if (!pindstats->istat_updated && istat_res != NULL)
{
- memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult));
- shared_istat->updated = true;
+ memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
+ pindstats->istat_updated = true;
/* Free the locally-allocated bulk-deletion result */
pfree(istat_res);
-
- /* return the pointer to the result from shared memory */
- return &shared_istat->istat;
}
- return istat_res;
+ /*
+ * Update the status to completed. No need to lock here since each worker
+ * touches different indexes.
+ */
+ pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
}
/*
else
{
/* Outsource everything to parallel variant */
- do_parallel_lazy_cleanup_all_indexes(vacrel);
+ parallel_vacuum_process_all_indexes(vacrel, false);
}
}
vacrel->relname)));
}
else
- begin_parallel_vacuum(vacrel, nworkers);
+ parallel_vacuum_begin(vacrel, nworkers);
/* If parallel mode started, vacrel->dead_items allocated in DSM */
if (ParallelVacuumIsActive(vacrel))
* End parallel mode before updating index statistics as we cannot write
* during parallel mode.
*/
- end_parallel_vacuum(vacrel);
+ parallel_vacuum_end(vacrel);
}
/*
* vacuum.
*/
static int
-compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
+parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
bool *will_parallel_vacuum)
{
int nindexes_parallel = 0;
Relation indrel = vacrel->indrels[idx];
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+ /* Skip index that is not a suitable target for parallel index vacuum */
if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
continue;
* VACUUM is currently active.
*/
static void
-begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
+parallel_vacuum_begin(LVRelState *vacrel, int nrequested)
{
LVParallelState *lps;
Relation *indrels = vacrel->indrels;
ParallelContext *pcxt;
LVShared *shared;
LVDeadItems *dead_items;
+ LVParallelIndStats *pindstats;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
bool *will_parallel_vacuum;
int max_items;
+ Size est_pindstats_len;
Size est_shared_len;
Size est_dead_items_len;
int nindexes_mwm = 0;
* Compute the number of parallel vacuum workers to launch
*/
will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
- parallel_workers = compute_parallel_vacuum_workers(vacrel,
- nrequested,
+ parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested,
will_parallel_vacuum);
if (parallel_workers <= 0)
{
parallel_workers);
Assert(pcxt->nworkers > 0);
lps->pcxt = pcxt;
+ lps->will_parallel_vacuum = will_parallel_vacuum;
- /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
- est_shared_len = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
- for (int idx = 0; idx < nindexes; idx++)
- {
- Relation indrel = indrels[idx];
- uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
-
- /*
- * Cleanup option should be either disabled, always performing in
- * parallel or conditionally performing in parallel.
- */
- Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
- Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
-
- /* Skip indexes that don't participate in parallel vacuum */
- if (!will_parallel_vacuum[idx])
- continue;
-
- if (indrel->rd_indam->amusemaintenanceworkmem)
- nindexes_mwm++;
-
- est_shared_len = add_size(est_shared_len, sizeof(LVSharedIndStats));
+ /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
+ est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes);
+ shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
- /*
- * Remember the number of indexes that support parallel operation for
- * each phase.
- */
- if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
- lps->nindexes_parallel_bulkdel++;
- if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
- lps->nindexes_parallel_cleanup++;
- if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
- lps->nindexes_parallel_condcleanup++;
- }
+ /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
+ est_shared_len = sizeof(LVShared);
shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
max_items = dead_items_max_items(vacrel);
- est_dead_items_len = MAXALIGN(max_items_to_alloc_size(max_items));
+ est_dead_items_len = max_items_to_alloc_size(max_items);
shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
InitializeParallelDSM(pcxt);
+ /* Prepare index vacuum stats */
+ pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len);
+ for (int idx = 0; idx < nindexes; idx++)
+ {
+ Relation indrel = indrels[idx];
+ uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+
+ /*
+ * Cleanup option should be either disabled, always performing in
+ * parallel or conditionally performing in parallel.
+ */
+ Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
+ Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
+
+ if (!will_parallel_vacuum[idx])
+ continue;
+
+ if (indrel->rd_indam->amusemaintenanceworkmem)
+ nindexes_mwm++;
+
+ /*
+ * Remember the number of indexes that support parallel operation for
+ * each phase.
+ */
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
+ lps->nindexes_parallel_bulkdel++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
+ lps->nindexes_parallel_cleanup++;
+ if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
+ lps->nindexes_parallel_condcleanup++;
+ }
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats);
+ lps->lvpindstats = pindstats;
+
/* Prepare shared information */
shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
MemSet(shared, 0, est_shared_len);
pg_atomic_init_u32(&(shared->cost_balance), 0);
pg_atomic_init_u32(&(shared->active_nworkers), 0);
pg_atomic_init_u32(&(shared->idx), 0);
- shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
-
- /*
- * Initialize variables for shared index statistics, set NULL bitmap and
- * the size of stats for each index.
- */
- memset(shared->bitmap, 0x00, BITMAPLEN(nindexes));
- for (int idx = 0; idx < nindexes; idx++)
- {
- if (!will_parallel_vacuum[idx])
- continue;
-
- /* Set NOT NULL as this index does support parallelism */
- shared->bitmap[idx >> 3] |= 1 << (idx & 0x07);
- }
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
lps->lvshared = shared;
PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
}
- pfree(will_parallel_vacuum);
-
/* Success -- set dead_items and lps in leader's vacrel state */
vacrel->dead_items = dead_items;
vacrel->lps = lps;
* context, but that won't be safe (see ExitParallelMode).
*/
static void
-end_parallel_vacuum(LVRelState *vacrel)
+parallel_vacuum_end(LVRelState *vacrel)
{
IndexBulkDeleteResult **indstats = vacrel->indstats;
LVParallelState *lps = vacrel->lps;
/* Copy the updated statistics */
for (int idx = 0; idx < nindexes; idx++)
{
- LVSharedIndStats *shared_istat;
-
- shared_istat = parallel_stats_for_idx(lps->lvshared, idx);
-
- /*
- * Skip index -- it must have been processed by the leader, from
- * inside do_serial_processing_for_unsafe_indexes()
- */
- if (shared_istat == NULL)
- continue;
+ LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
- if (shared_istat->updated)
+ if (pindstats->istat_updated)
{
indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
- memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult));
+ memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult));
}
else
indstats[idx] = NULL;
ExitParallelMode();
/* Deactivate parallel vacuum */
+ pfree(lps->will_parallel_vacuum);
pfree(lps);
vacrel->lps = NULL;
}
/*
- * Return shared memory statistics for index at offset 'getidx', if any
- *
- * Returning NULL indicates that compute_parallel_vacuum_workers() determined
- * that the index is a totally unsuitable target for all parallel processing
- * up front. For example, the index could be < min_parallel_index_scan_size
- * cutoff.
- */
-static LVSharedIndStats *
-parallel_stats_for_idx(LVShared *lvshared, int getidx)
-{
- char *p;
-
- if (IndStatsIsNull(lvshared, getidx))
- return NULL;
-
- p = (char *) GetSharedIndStats(lvshared);
- for (int idx = 0; idx < getidx; idx++)
- {
- if (IndStatsIsNull(lvshared, idx))
- continue;
-
- p += sizeof(LVSharedIndStats);
- }
-
- return (LVSharedIndStats *) p;
-}
-
-/*
- * Returns false, if the given index can't participate in parallel index
- * vacuum or parallel index cleanup
+ * Returns false, if the given index can't participate in the next execution of
+ * parallel index vacuum or parallel index cleanup.
*/
static bool
-parallel_processing_is_safe(Relation indrel, LVShared *lvshared)
+parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
+ bool vacuum)
{
- uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
+ uint8 vacoptions;
- /* first_time must be true only if for_cleanup is true */
- Assert(lvshared->for_cleanup || !lvshared->first_time);
+ vacoptions = indrel->rd_indam->amparallelvacuumoptions;
- if (lvshared->for_cleanup)
- {
- /* Skip, if the index does not support parallel cleanup */
- if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
- return false;
+ /* In parallel vacuum case, check if it supports parallel bulk-deletion */
+ if (vacuum)
+ return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
- /*
- * Skip, if the index supports parallel cleanup conditionally, but we
- * have already processed the index (for bulkdelete). See the
- * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
- * when indexes support parallel cleanup conditionally.
- */
- if (!lvshared->first_time &&
- ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
- return false;
- }
- else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
- {
- /* Skip if the index does not support parallel bulk deletion */
+ /* Not safe, if the index does not support parallel cleanup */
+ if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
+ return false;
+
+ /*
+ * Not safe, if the index supports parallel cleanup conditionally, but we
+ * have already processed the index (for bulkdelete). We do this to avoid
+ * the need to invoke workers when parallel index cleanup doesn't need to
+ * scan the index. See the comments for option
+ * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
+ * parallel cleanup conditionally.
+ */
+ if (vacrel->num_index_scans > 0 &&
+ ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
return false;
- }
return true;
}
{
Relation rel;
Relation *indrels;
+ LVParallelIndStats *lvpindstats;
LVShared *lvshared;
LVDeadItems *dead_items;
BufferUsage *buffer_usage;
false);
elevel = lvshared->elevel;
- if (lvshared->for_cleanup)
- elog(DEBUG1, "starting parallel vacuum worker for cleanup");
- else
- elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
+ elog(DEBUG1, "starting parallel vacuum worker");
/* Set debug_query_string for individual workers */
sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
Assert(nindexes > 0);
+ /* Set index statistics */
+ lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc,
+ PARALLEL_VACUUM_KEY_INDEX_STATS,
+ false);
+
/* Set dead_items space (set as worker's vacrel dead_items below) */
dead_items = (LVDeadItems *) shm_toc_lookup(toc,
PARALLEL_VACUUM_KEY_DEAD_ITEMS,
InstrStartParallelQuery();
/* Process indexes to perform vacuum/cleanup */
- do_parallel_processing(&vacrel, lvshared);
+ parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats);
/* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);