public inbox for [email protected]
help / color / mirror / Atom feedFrom: Maxim Orlov <[email protected]>
To: Postgres hackers <[email protected]>
Subject: POC: Parallel processing of indexes in autovacuum
Date: Wed, 16 Apr 2025 14:04:53 +0300
Message-ID: <CACG=ezZOrNsuLoETLD1gAswZMuH2nGGq7Ogcc0QOE5hhWaw=cw@mail.gmail.com> (raw)
Hi!
The VACUUM command can be executed with the parallel option. As
documentation states, it will perform index vacuum and index cleanup phases
of VACUUM in parallel using *integer* background workers. But such an
interesting feature is not used for an autovacuum. After a quick look at
the source codes, it became clear to me that when the parallel option was
added, the corresponding option for autovacuum wasn't implemented, although
there are no clear obstacles to this.
Actually, one of our customers step into a problem with autovacuum on a
table with many indexes and relatively long transactions. Of course, long
transactions are an ultimate evil and the problem can be solved by calling
running vacuum and a cron task, but, I think, we can do better.
Anyhow, what about adding parallel option for an autovacuum? Here is a POC
patch for proposed functionality. For the sake of simplicity's, several
GUC's have been added. It would be good to think through the parallel
launch condition without them.
As always, any thoughts and opinions are very welcome!
--
Best regards,
Maxim Orlov.
Attachments:
[application/octet-stream] WIP-Allow-autovacuum-to-process-indexes-of-single-table.patch (58.2K, 3-WIP-Allow-autovacuum-to-process-indexes-of-single-table.patch)
download | inline diff:
From 58dd9a144f065b3619615efc4c2afc1cc6721617 Mon Sep 17 00:00:00 2001
From: Daniil Davidov <[email protected]>
Date: Tue, 1 Apr 2025 14:39:49 +0700
Subject: [PATCH] Allow autovacuum to process indexes of single table in
parallel mode
Author: Daniil Davydov <[email protected]>
Author: Maxim Orlov <[email protected]>
---
src/backend/commands/vacuum.c | 27 +
src/backend/commands/vacuumparallel.c | 290 ++++++-
src/backend/postmaster/autovacuum.c | 801 +++++++++++++++++-
src/backend/utils/misc/guc_tables.c | 30 +
src/backend/utils/misc/postgresql.conf.sample | 6 +
src/include/postmaster/autovacuum.h | 25 +
src/test/modules/autovacuum/.gitignore | 1 +
src/test/modules/autovacuum/Makefile | 14 +
.../autovacuum/t/001_autovac_parallel.pl | 137 +++
9 files changed, 1285 insertions(+), 46 deletions(-)
create mode 100644 src/test/modules/autovacuum/.gitignore
create mode 100644 src/test/modules/autovacuum/Makefile
create mode 100644 src/test/modules/autovacuum/t/001_autovac_parallel.pl
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index db5da3ce826..5f51a65967c 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -2232,6 +2232,33 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
else
toast_relid = InvalidOid;
+ /*
+ * Decide whether we need to process table with given oid in parallel mode
+ * during autovacuum.
+ */
+ if (AmAutoVacuumWorkerProcess() &&
+ params->index_cleanup != VACOPTVALUE_DISABLED)
+ {
+ PgStat_StatTabEntry *tabentry;
+
+ /* fetch the pgstat table entry */
+ tabentry = pgstat_fetch_stat_tabentry_ext(rel->rd_rel->relisshared,
+ rel->rd_id);
+ if (tabentry && tabentry->dead_tuples >= autovac_idx_parallel_min_rows)
+ {
+ List *indexes = RelationGetIndexList(rel);
+ int num_indexes = list_length(indexes);
+
+ list_free(indexes);
+
+ if (num_indexes >= autovac_idx_parallel_min_indexes &&
+ max_parallel_index_autovac_workers > 0)
+ {
+ params->nworkers = max_parallel_index_autovac_workers;
+ }
+ }
+ }
+
/*
* Switch to the table owner's userid, so that any index functions are run
* as that user. Also lock down security-restricted operations and
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 2b9d548cdeb..6094d6c649b 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -1,20 +1,23 @@
/*-------------------------------------------------------------------------
*
* vacuumparallel.c
- * Support routines for parallel vacuum execution.
+ * Support routines for parallel [auto]vacuum execution.
*
* This file contains routines that are intended to support setting up, using,
* and tearing down a ParallelVacuumState.
*
- * In a parallel vacuum, we perform both index bulk deletion and index cleanup
- * with parallel worker processes. Individual indexes are processed by one
- * vacuum process. ParallelVacuumState contains shared information as well as
- * the memory space for storing dead items allocated in the DSA area. We
+ * In a parallel [auto]vacuum, we perform both index bulk deletion and index
+ * cleanup with parallel worker processes. Individual indexes are processed by
+ * one vacuum process. ParallelVacuumState contains shared information as well
+ * as the memory space for storing dead items allocated in the DSA area. We
* launch parallel worker processes at the start of parallel index
* bulk-deletion and index cleanup and once all indexes are processed, the
* parallel worker processes exit. Each time we process indexes in parallel,
* the parallel context is re-initialized so that the same DSM can be used for
- * multiple passes of index bulk-deletion and index cleanup.
+ * multiple passes of index bulk-deletion and index cleanup. For maintenance
+ * vacuum, we launch workers manually (using dynamic bgworkers machinery), and
+ * for autovacuum we send signals to the autovacuum launcher (all logic for
+ * communication among parallel autovacuum processes is in autovacuum.c).
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -34,9 +37,11 @@
#include "executor/instrument.h"
#include "optimizer/paths.h"
#include "pgstat.h"
+#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
+#include "utils/memutils.h"
#include "utils/rel.h"
/*
@@ -157,11 +162,20 @@ typedef struct PVIndStats
} PVIndStats;
/*
- * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
+ * Struct for maintaining a parallel [auto]vacuum state. typedef appears in
+ * vacuum.h.
*/
struct ParallelVacuumState
{
- /* NULL for worker processes */
+ /* Is this structure used for maintenance vacuum or autovacuum */
+ bool is_autovacuum;
+
+ /*
+ * NULL for worker processes.
+ *
+ * NOTE: Parallel autovacuum only needs a subset of the maintenance vacuum
+ * functionality.
+ */
ParallelContext *pcxt;
/* Parent Heap Relation */
@@ -221,6 +235,10 @@ struct ParallelVacuumState
PVIndVacStatus status;
};
+static ParallelContext *CreateParallelAutoVacContext(int nworkers);
+static void InitializeParallelAutoVacDSM(ParallelContext *pcxt);
+static void DestroyParallelAutoVacContext(ParallelContext *pcxt);
+
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
bool *will_parallel_vacuum);
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
@@ -280,15 +298,21 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
}
pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
+ pvs->is_autovacuum = AmAutoVacuumWorkerProcess();
pvs->indrels = indrels;
pvs->nindexes = nindexes;
pvs->will_parallel_vacuum = will_parallel_vacuum;
pvs->bstrategy = bstrategy;
pvs->heaprel = rel;
- EnterParallelMode();
- pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
- parallel_workers);
+ if (pvs->is_autovacuum)
+ pcxt = CreateParallelAutoVacContext(parallel_workers);
+ else
+ {
+ EnterParallelMode();
+ pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+ parallel_workers);
+ }
Assert(pcxt->nworkers > 0);
pvs->pcxt = pcxt;
@@ -327,7 +351,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
else
querylen = 0; /* keep compiler quiet */
- InitializeParallelDSM(pcxt);
+ if (pvs->is_autovacuum)
+ InitializeParallelAutoVacDSM(pvs->pcxt);
+ else
+ InitializeParallelDSM(pcxt);
/* Prepare index vacuum stats */
indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
@@ -371,11 +398,18 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
shared->relid = RelationGetRelid(rel);
shared->elevel = elevel;
shared->queryid = pgstat_get_my_query_id();
- shared->maintenance_work_mem_worker =
- (nindexes_mwm > 0) ?
- maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
- maintenance_work_mem;
- shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
+
+ if (pvs->is_autovacuum)
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ autovacuum_work_mem / Min(parallel_workers, nindexes_mwm) :
+ autovacuum_work_mem;
+ else
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+ maintenance_work_mem;
+ shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
/* Prepare DSA space for dead items */
dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
@@ -453,8 +487,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
TidStoreDestroy(pvs->dead_items);
- DestroyParallelContext(pvs->pcxt);
- ExitParallelMode();
+ if (pvs->is_autovacuum)
+ DestroyParallelAutoVacContext(pvs->pcxt);
+ else
+ {
+ DestroyParallelContext((ParallelContext *) pvs->pcxt);
+ ExitParallelMode();
+ }
pfree(pvs->will_parallel_vacuum);
pfree(pvs);
@@ -532,6 +571,144 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
}
+/*
+ * Short version of CreateParallelContext (parallel.c). Here we init only those
+ * fields that are needed for parallel index processing during autovacuum.
+ */
+static ParallelContext *
+CreateParallelAutoVacContext(int nworkers)
+{
+ ParallelContext *pcxt;
+ MemoryContext oldcontext;
+
+ Assert(AmAutoVacuumWorkerProcess());
+
+ /* Number of workers should be non-negative. */
+ Assert(nworkers >= 0);
+
+ /* We might be running in a short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Initialize a new ParallelContext. */
+ pcxt = palloc0(sizeof(ParallelContext));
+ pcxt->nworkers = nworkers;
+ pcxt->nworkers_to_launch = nworkers;
+ shm_toc_initialize_estimator(&pcxt->estimator);
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ return pcxt;
+}
+
+/*
+ * Short version of InitializeParallelDSM (parallel.c). Here we put into dsm
+ * only those data that are needed for parallel index processing during
+ * autovacuum.
+ */
+static void
+InitializeParallelAutoVacDSM(ParallelContext *pcxt)
+{
+ MemoryContext oldcontext;
+ Size tsnaplen = 0;
+ Size asnaplen = 0;
+ Size segsize = 0;
+ char *tsnapspace;
+ char *asnapspace;
+ Snapshot transaction_snapshot = GetTransactionSnapshot();
+ Snapshot active_snapshot = GetActiveSnapshot();
+
+ Assert(pcxt->nworkers >= 1);
+
+ /* We might be running in a very short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ if (IsolationUsesXactSnapshot())
+ {
+ tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ asnaplen = EstimateSnapshotSpace(active_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+
+ /* Create DSM and initialize with new table of contents. */
+ segsize = shm_toc_estimate(&pcxt->estimator);
+ pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+
+ if (pcxt->seg == NULL)
+ {
+ pcxt->nworkers = 0;
+ pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
+ }
+
+ pcxt->toc = shm_toc_create(AV_PARALLEL_MAGIC,
+ pcxt->seg == NULL ? pcxt->private_memory :
+ dsm_segment_address(pcxt->seg),
+ segsize);
+
+ /* We can skip the rest of this if we're not budgeting for any workers. */
+ if (pcxt->nworkers > 0)
+ {
+ /*
+ * Serialize the transaction snapshot if the transaction isolation
+ * level uses a transaction snapshot.
+ */
+ if (IsolationUsesXactSnapshot())
+ {
+ tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+ SerializeSnapshot(transaction_snapshot, tsnapspace);
+ shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT,
+ tsnapspace);
+ }
+
+ /* Serialize the active snapshot. */
+ asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+ SerializeSnapshot(active_snapshot, asnapspace);
+ shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+ }
+
+ /* Update nworkers_to_launch, in case we changed nworkers above. */
+ pcxt->nworkers_to_launch = pcxt->nworkers;
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Short version of DestroyParallelContext (parallel.c). Here we clean up only
+ * those data that were used during parallel index processing during autovacuum.
+ */
+static void
+DestroyParallelAutoVacContext(ParallelContext *pcxt)
+{
+ /*
+ * If we have allocated a shared memory segment, detach it. This will
+ * implicitly detach the error queues, and any other shared memory queues,
+ * stored there.
+ */
+ if (pcxt->seg != NULL)
+ {
+ dsm_detach(pcxt->seg);
+ pcxt->seg = NULL;
+ }
+
+ /*
+ * If this parallel context is actually in backend-private memory rather
+ * than shared memory, free that memory instead.
+ */
+ if (pcxt->private_memory != NULL)
+ {
+ pfree(pcxt->private_memory);
+ pcxt->private_memory = NULL;
+ }
+
+ AutoVacuumReleaseParallelWork(false);
+ pfree(pcxt);
+}
+
/*
* Compute the number of parallel worker processes to request. Both index
* vacuum and index cleanup can be executed with parallel workers.
@@ -558,7 +735,9 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
* We don't allow performing parallel operation in standalone backend or
* when parallelism is disabled.
*/
- if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+ if (!IsUnderPostmaster ||
+ (max_parallel_maintenance_workers == 0 && !AmAutoVacuumWorkerProcess()) ||
+ (max_parallel_index_autovac_workers == 0 && AmAutoVacuumWorkerProcess()))
return 0;
/*
@@ -597,15 +776,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
parallel_workers = (nrequested > 0) ?
Min(nrequested, nindexes_parallel) : nindexes_parallel;
- /* Cap by max_parallel_maintenance_workers */
- parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+ /* Cap by GUC variable */
+ parallel_workers = AmAutoVacuumWorkerProcess() ?
+ Min(parallel_workers, max_parallel_index_autovac_workers) :
+ Min(parallel_workers, max_parallel_maintenance_workers);
return parallel_workers;
}
/*
* Perform index vacuum or index cleanup with parallel workers. This function
- * must be used by the parallel vacuum leader process.
+ * must be used by the parallel [auto]vacuum leader process.
*/
static void
parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
@@ -670,7 +851,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
if (nworkers > 0)
{
/* Reinitialize parallel context to relaunch parallel workers */
- if (num_index_scans > 0)
+ if (num_index_scans > 0 && !pvs->is_autovacuum)
ReinitializeParallelDSM(pvs->pcxt);
/*
@@ -686,9 +867,23 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
* The number of workers can vary between bulkdelete and cleanup
* phase.
*/
- ReinitializeParallelWorkers(pvs->pcxt, nworkers);
-
- LaunchParallelWorkers(pvs->pcxt);
+ if (pvs->is_autovacuum)
+ {
+ pvs->pcxt->nworkers_to_launch = Min(pvs->pcxt->nworkers, nworkers);
+ if (pvs->pcxt->nworkers > 0 && pvs->pcxt->nworkers_to_launch > 0)
+ {
+ pvs->pcxt->nworkers_launched =
+ LaunchParallelAutovacuumWorkers(pvs->heaprel->rd_id,
+ pvs->pcxt->nworkers_to_launch,
+ dsm_segment_handle(pvs->pcxt->seg),
+ MyProc, MyProcPid);
+ }
+ }
+ else
+ {
+ ReinitializeParallelWorkers(pvs->pcxt, nworkers);
+ LaunchParallelWorkers(pvs->pcxt);
+ }
if (pvs->pcxt->nworkers_launched > 0)
{
@@ -733,8 +928,14 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
*/
if (nworkers > 0)
{
- /* Wait for all vacuum workers to finish */
- WaitForParallelWorkersToFinish(pvs->pcxt);
+ /*
+ * Wait for all [auto]vacuum workers (involved in parallel index
+ * processing) to finish.
+ */
+ if (pvs->is_autovacuum)
+ ParallelAutovacuumEndSyncPoint(false);
+ else
+ WaitForParallelWorkersToFinish(pvs->pcxt);
for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
@@ -982,8 +1183,8 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
/*
* Perform work within a launched parallel process.
*
- * Since parallel vacuum workers perform only index vacuum or index cleanup,
- * we don't need to report progress information.
+ * Since parallel [auto]vacuum workers perform only index vacuum or index
+ * cleanup, we don't need to report progress information.
*/
void
parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
@@ -997,23 +1198,22 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
BufferUsage *buffer_usage;
WalUsage *wal_usage;
int nindexes;
+ int worker_number;
char *sharedquery;
ErrorContextCallback errcallback;
- /*
- * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
- * don't support parallel vacuum for autovacuum as of now.
- */
- Assert(MyProc->statusFlags == PROC_IN_VACUUM);
-
- elog(DEBUG1, "starting parallel vacuum worker");
+ Assert(MyProc->statusFlags == PROC_IN_VACUUM || AmAutoVacuumWorkerProcess());
+ elog(DEBUG1, "starting parallel [auto]vacuum worker");
shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
/* Set debug_query_string for individual workers */
- sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
- debug_query_string = sharedquery;
- pgstat_report_activity(STATE_RUNNING, debug_query_string);
+ if (!AmAutoVacuumWorkerProcess())
+ {
+ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+ debug_query_string = sharedquery;
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+ }
/* Track query ID */
pgstat_report_query_id(shared->queryid, false);
@@ -1091,8 +1291,12 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
/* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+
+ worker_number = AmAutoVacuumWorkerProcess() ?
+ GetAutoVacuumParallelWorkerNumber() : ParallelWorkerNumber;
+
+ InstrEndParallelQuery(&buffer_usage[worker_number],
+ &wal_usage[worker_number]);
/* Report any remaining cost-based vacuum delay time */
if (track_cost_delay_timing)
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 16756152b71..60192ecb8f5 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -90,6 +90,7 @@
#include "postmaster/postmaster.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
@@ -102,6 +103,7 @@
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
+#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -129,6 +131,9 @@ int autovacuum_anl_thresh;
double autovacuum_anl_scale;
int autovacuum_freeze_max_age;
int autovacuum_multixact_freeze_max_age;
+int max_parallel_index_autovac_workers;
+int autovac_idx_parallel_min_rows;
+int autovac_idx_parallel_min_indexes;
double autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
@@ -164,6 +169,14 @@ static int default_freeze_table_age;
static int default_multixact_freeze_min_age;
static int default_multixact_freeze_table_age;
+/*
+ * Number of additional workers that was requested for parallel index processing
+ * during autovacuum.
+ */
+static int nworkers_for_idx_autovac = 0;
+
+static int nworkers_launched = 0;
+
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
@@ -222,6 +235,8 @@ typedef struct autovac_table
* wi_proc pointer to PGPROC of the running worker, NULL if not started
* wi_launchtime Time at which this worker was launched
* wi_dobalance Whether this worker should be included in balance calculations
+ * wi_pcleanup if (> 0) => this worker must participate in parallel index
+ * vacuuming as supportive . Must be (== 0) for leader worker.
*
* All fields are protected by AutovacuumLock, except for wi_tableoid and
* wi_sharedrel which are protected by AutovacuumScheduleLock (note these
@@ -237,10 +252,17 @@ typedef struct WorkerInfoData
TimestampTz wi_launchtime;
pg_atomic_flag wi_dobalance;
bool wi_sharedrel;
+ int wi_pcleanup;
} WorkerInfoData;
typedef struct WorkerInfoData *WorkerInfo;
+#define AmParallelIdxAutoVacSupportive() \
+ (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup > 0)
+
+#define AmParallelIdxAutoVacLeader() \
+ (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup == 0)
+
/*
* Possible signals received by the launcher from remote processes. These are
* stored atomically in shared memory so that other processes can set them
@@ -250,9 +272,11 @@ typedef enum
{
AutoVacForkFailed, /* failed trying to start a worker */
AutoVacRebalance, /* rebalance the cost limits */
+ AutoVacParallelReq, /* request for parallel index vacuum */
+ AutoVacNumSignals, /* must be last */
} AutoVacuumSignal;
-#define AutoVacNumSignals (AutoVacRebalance + 1)
+#define AutoVacNumSignals (AutoVacParallelReq + 1)
/*
* Autovacuum workitem array, stored in AutoVacuumShmem->av_workItems. This
@@ -272,6 +296,49 @@ typedef struct AutoVacuumWorkItem
#define NUM_WORKITEMS 256
+typedef enum
+{
+ LAUNCHER = 0, /* autovacuum launcher must wake everyone up */
+ LEADER, /* leader must wake everyone up */
+ LAST_WORKER, /* the last inited supportive worker must wake everyone
+ up */
+} SyncType;
+
+typedef enum
+{
+ STARTUP = 0, /* initial value - no sync points were passed */
+ START_SYNC_POINT_PASSED, /* start_sync_point was passed */
+ END_SYNC_POINT_PASSED, /* end_sync_point was passed */
+ SHUTDOWN, /* leader wants to shut down parallel index
+ vacuum due to occured error */
+} Status;
+
+/*
+ * Structure, stored in AutoVacuumShmem->pav_workItem. This is used for managing
+ * parallel index processing (whithin single table).
+ */
+typedef struct ParallelAutoVacuumWorkItem
+{
+ Oid avw_database;
+ Oid avw_relation;
+ int nworkers_participating;
+ int nworkers_to_launch;
+ int nworkers_sleeping; /* leader doesn't count */
+ int nfinished; /* # of workers, that already finished parallel
+ index processing (and probably already dead) */
+
+ dsm_handle handl;
+ ProcNumber leader_proc_num;
+
+ PGPROC *leader_proc;
+ ConditionVariable cv;
+
+ bool active; /* being processed */
+ bool leader_sleeping;
+ SyncType sync_type;
+ Status status;
+} ParallelAutoVacuumWorkItem;
+
/*-------------
* The main autovacuum shmem struct. On shared memory we store this main
* struct and the array of WorkerInfo structs. This struct keeps:
@@ -283,6 +350,8 @@ typedef struct AutoVacuumWorkItem
* av_startingWorker pointer to WorkerInfo currently being started (cleared by
* the worker itself as soon as it's up and running)
* av_workItems work item array
+ * pav_workItem information needed for parallel index processing whithing
+ * single table
* av_nworkersForBalance the number of autovacuum workers to use when
* calculating the per worker cost limit
*
@@ -298,6 +367,7 @@ typedef struct
dlist_head av_runningWorkers;
WorkerInfo av_startingWorker;
AutoVacuumWorkItem av_workItems[NUM_WORKITEMS];
+ ParallelAutoVacuumWorkItem pav_workItem;
pg_atomic_uint32 av_nworkersForBalance;
} AutoVacuumShmemStruct;
@@ -322,11 +392,17 @@ pg_noreturn static void AutoVacLauncherShutdown(void);
static void launcher_determine_sleep(bool canlaunch, bool recursing,
struct timeval *nap);
static void launch_worker(TimestampTz now);
+static void launch_worker_for_pcleanup(TimestampTz now);
+static void eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item,
+ bool all_launched);
static List *get_database_list(void);
static void rebuild_database_list(Oid newdb);
static int db_comparator(const void *a, const void *b);
static void autovac_recalculate_workers_for_balance(void);
+static int parallel_autovacuum_start_sync_point(bool keep_lock);
+static void handle_parallel_idx_autovac_errors(void);
+
static void do_autovacuum(void);
static void FreeWorkerInfo(int code, Datum arg);
@@ -583,7 +659,14 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
* wakening conditions.
*/
- launcher_determine_sleep(av_worker_available(), false, &nap);
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ /* Take the smallest possible sleep interval. */
+ nap.tv_sec = 0;
+ nap.tv_usec = MIN_AUTOVAC_SLEEPTIME * 1000;
+ }
+ else
+ launcher_determine_sleep(av_worker_available(), false, &nap);
/*
* Wait until naptime expires or we get some type of signal (all the
@@ -614,6 +697,19 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
LWLockRelease(AutovacuumLock);
}
+ if (AutoVacuumShmem->av_signal[AutoVacParallelReq])
+ {
+ ParallelAutoVacuumWorkItem *item;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ AutoVacuumShmem->av_signal[AutoVacParallelReq] = false;
+
+ item = &AutoVacuumShmem->pav_workItem;
+ nworkers_for_idx_autovac = item->nworkers_to_launch;
+ nworkers_launched = 0;
+ LWLockRelease(AutovacuumLock);
+ }
+
if (AutoVacuumShmem->av_signal[AutoVacForkFailed])
{
/*
@@ -686,6 +782,7 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
worker->wi_sharedrel = false;
worker->wi_proc = NULL;
worker->wi_launchtime = 0;
+ worker->wi_pcleanup = -1;
dclist_push_head(&AutoVacuumShmem->av_freeWorkers,
&worker->wi_links);
AutoVacuumShmem->av_startingWorker = NULL;
@@ -698,9 +795,29 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
}
LWLockRelease(AutovacuumLock); /* either shared or exclusive */
- /* if we can't do anything, just go back to sleep */
if (!can_launch)
+ {
+ /*
+ * If launcher cannot launch all requested for parallel index
+ * vacuum workers, it must handle all possible lock conflicts and
+ * tell everyone, that there will no new supportive workers.
+ */
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ ParallelAutoVacuumWorkItem *item;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+ Assert(item->active);
+
+ eliminate_lock_conflicts(item, false);
+ nworkers_launched = nworkers_for_idx_autovac = 0;
+ LWLockRelease(AutovacuumLock);
+ }
+
+ /* if we can't do anything else, just go back to sleep */
continue;
+ }
/* We're OK to start a new worker */
@@ -716,6 +833,15 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
*/
launch_worker(current_time);
}
+ else if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ /*
+ * One of active autovacuum workers sent us request to lauch
+ * participants for parallel index vacuum. We check this case first
+ * because we need to start participants as soon as possible.
+ */
+ launch_worker_for_pcleanup(current_time);
+ }
else
{
/*
@@ -1267,6 +1393,7 @@ do_start_worker(void)
worker->wi_dboid = avdb->adw_datid;
worker->wi_proc = NULL;
worker->wi_launchtime = GetCurrentTimestamp();
+ worker->wi_pcleanup = -1;
AutoVacuumShmem->av_startingWorker = worker;
@@ -1349,6 +1476,134 @@ launch_worker(TimestampTz now)
}
}
+/*
+ * launch_worker_for_pcleanup
+ *
+ * Wrapper for starting a worker (requested by leader of parallel index
+ * vacuuming) from the launcher.
+ */
+static void
+launch_worker_for_pcleanup(TimestampTz now)
+{
+ ParallelAutoVacuumWorkItem *item;
+ WorkerInfo worker;
+ dlist_node *wptr;
+
+ Assert(nworkers_launched < nworkers_for_idx_autovac);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ /*
+ * Get a worker entry from the freelist. We checked above, so there
+ * really should be a free slot.
+ */
+ wptr = dclist_pop_head_node(&AutoVacuumShmem->av_freeWorkers);
+
+ worker = dlist_container(WorkerInfoData, wi_links, wptr);
+ worker->wi_dboid = InvalidOid;
+ worker->wi_proc = NULL;
+ worker->wi_launchtime = GetCurrentTimestamp();
+
+ /*
+ * Set indicator, that this workers must join to parallel index vacuum.
+ * This variable also plays the role of an unique id among parallel index
+ * vacuum workers. First id is '1', because '0' is reserved for leader.
+ */
+ worker->wi_pcleanup = (nworkers_launched + 1);
+
+ AutoVacuumShmem->av_startingWorker = worker;
+
+ SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ item = &AutoVacuumShmem->pav_workItem;
+ Assert(item->active);
+
+ nworkers_launched += 1;
+
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ LWLockRelease(AutovacuumLock);
+ return;
+ }
+
+ Assert(item->sync_type == LAUNCHER &&
+ nworkers_launched == nworkers_for_idx_autovac);
+
+ /*
+ * If launcher managed to launch all requested for parallel index
+ * vacuum workers, it must handle all possible lock conflicts.
+ */
+ eliminate_lock_conflicts(item, true);
+ LWLockRelease(AutovacuumLock);
+}
+
+/*
+ * Must be called from autovacuum launcher when it launched all requested
+ * workers for parallel index vacuum, or when it realized, that no more
+ * processes can be launched.
+ *
+ * In this function launcher will assign roles in such a way as to avoid lock
+ * conflicts between leader and supportive workers.
+ *
+ * AutovacuumLock must be held in exclusive mode before calling this function!
+ */
+static void
+eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item, bool all_launched)
+{
+ Assert(AmAutoVacuumLauncherProcess());
+ Assert(LWLockHeldByMe(AutovacuumLock));
+
+ /* So, let's start... */
+
+ if (item->leader_sleeping && item->nworkers_sleeping == nworkers_launched)
+ {
+ /*
+ * If both leader and all launched supportive workers are sleeping, then
+ * only we can wake everyone up.
+ */
+ LWLockRelease(AutovacuumLock);
+ ConditionVariableBroadcast(&item->cv);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ }
+ else if (item->leader_sleeping &&
+ item->nworkers_sleeping < nworkers_launched)
+ {
+ /*
+ * If leader already sleeping, but several supportive workers are
+ * initing, we shift the responsibility for awakening everyone into the
+ * worker who completes initialization last
+ */
+ item->sync_type = LAST_WORKER;
+ }
+ else if (!item->leader_sleeping &&
+ item->nworkers_sleeping == nworkers_launched)
+ {
+ /*
+ * If only leader is not sleeping - it must wake up all workers when it
+ * finishes all preparations.
+ */
+ item->sync_type = LEADER;
+ }
+ else
+ {
+ /*
+ * If nobody is sleeping, we assume that leader has higher chanses to
+ * asleep first, so set sync type to LAST_WORKER, but if the last worker
+ * will see that leader still not sleeping, it will change sync type to
+ * LEADER and asleep.
+ */
+ item->sync_type = LAST_WORKER;
+ }
+
+ /*
+ * If we cannot launch all requested workers, refresh
+ * nworkers_to_launch value, so that the last worker can find out
+ * that he is really the last.
+ */
+ if (!all_launched && item->sync_type == LAST_WORKER)
+ item->nworkers_to_launch = nworkers_launched;
+}
+
/*
* Called from postmaster to signal a failure to fork a process to become
* worker. The postmaster should kill(SIGUSR2) the launcher shortly
@@ -1360,6 +1615,38 @@ AutoVacWorkerFailed(void)
AutoVacuumShmem->av_signal[AutoVacForkFailed] = true;
}
+/*
+ * Called from autovacuum worker to signal that he needs participants in
+ * parallel index vacuum. Function sends SIGUSR2 to the launcher and returns
+ * 'true' iff signal was sent successfully.
+ */
+bool
+AutoVacParallelWorkRequest(void)
+{
+ if (AutoVacuumShmem->av_launcherpid == 0)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("autovacuum launcher is dead")));
+
+ return false;
+ }
+
+ AutoVacuumShmem->av_signal[AutoVacParallelReq] = true;
+
+ if (kill(AutoVacuumShmem->av_launcherpid, SIGUSR2) < 0)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_SYSTEM_ERROR),
+ errmsg("failed to send signal to autovac launcher (pid %d): %m",
+ AutoVacuumShmem->av_launcherpid)));
+
+ return false;
+ }
+
+ return true;
+}
+
/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */
static void
avl_sigusr2_handler(SIGNAL_ARGS)
@@ -1559,6 +1846,8 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len)
{
char dbname[NAMEDATALEN];
+ Assert(MyWorkerInfo->wi_pcleanup < 0);
+
/*
* Report autovac startup to the cumulative stats system. We
* deliberately do this before InitPostgres, so that the
@@ -1593,12 +1882,112 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len)
recentMulti = ReadNextMultiXactId();
do_autovacuum();
}
+ else if (AmParallelIdxAutoVacSupportive())
+ {
+ ParallelAutoVacuumWorkItem *item;
+ dsm_handle handle;
+ PGPROC *leader_proc;
+ ProcNumber leader_proc_number;
+ dsm_segment *seg;
+ shm_toc *toc;
+ char *asnapspace;
+ char *tsnapspace;
+ char dbname[NAMEDATALEN];
+ Snapshot tsnapshot;
+ Snapshot asnapshot;
+
+ /*
+ * We will abort parallel index vacuuming whithin current process if
+ * something errors out
+ */
+ PG_TRY();
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+ dbid = item->avw_database;
+ handle = item->handl;
+ leader_proc = item->leader_proc;
+ leader_proc_number = item->leader_proc_num;
+ LWLockRelease(AutovacuumLock);
+
+ InitPostgres(NULL, dbid, NULL, InvalidOid,
+ INIT_PG_OVERRIDE_ALLOW_CONNS,
+ dbname);
+
+ set_ps_display(dbname);
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ /* And do an appropriate amount of work */
+ recentXid = ReadNextTransactionId();
+ recentMulti = ReadNextMultiXactId();
+
+ if (parallel_autovacuum_start_sync_point(false) == -1)
+ {
+ /* We are not participating anymore */
+ MyWorkerInfo->wi_pcleanup = -1;
+ goto exit;
+ }
+
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(AV_PARALLEL_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("invalid magic number in dynamic shared memory segment")));
+
+ if (!BecomeLockGroupMember(leader_proc, leader_proc_number))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not become lock group member")));
+ }
+
+ StartTransactionCommand();
+
+ asnapspace =
+ shm_toc_lookup(toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
+ tsnapspace =
+ shm_toc_lookup(toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
+ asnapshot = RestoreSnapshot(asnapspace);
+ tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
+ RestoreTransactionSnapshot(tsnapshot, leader_proc);
+ PushActiveSnapshot(asnapshot);
+
+ /*
+ * We've changed which tuples we can see, and must therefore
+ * invalidate system caches.
+ */
+ InvalidateSystemCaches();
+
+ parallel_vacuum_main(seg, toc);
+
+ /* Must pop active snapshot so snapmgr.c doesn't complain. */
+ PopActiveSnapshot();
+
+ dsm_detach(seg);
+ CommitTransactionCommand();
+ ParallelAutovacuumEndSyncPoint(false);
+ }
+ PG_CATCH();
+ {
+ EmitErrorReport();
+ handle_parallel_idx_autovac_errors();
+ }
+ PG_END_TRY();
+ }
/*
* The launcher will be notified of my death in ProcKill, *if* we managed
* to get a worker slot at all
*/
+exit:
/* All done, go away */
proc_exit(0);
}
@@ -2461,6 +2850,10 @@ do_autovacuum(void)
tab->at_datname, tab->at_nspname, tab->at_relname);
EmitErrorReport();
+ /* if we are parallel index vacuuming leader, we must shut it down */
+ if (AmParallelIdxAutoVacLeader())
+ handle_parallel_idx_autovac_errors();
+
/* this resets ProcGlobal->statusFlags[i] too */
AbortOutOfAnyTransaction();
FlushErrorState();
@@ -3296,6 +3689,405 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId,
return result;
}
+/*
+ * Release work item, used for managing parallel index vacuum. Must be called
+ * once and only from leader worker.
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ */
+void
+AutoVacuumReleaseParallelWork(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+
+ Assert(workitem->leader_proc_num == MyProcPid);
+
+ workitem->active = false;
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+}
+
+/*
+ * Waiting on condition variable is frequent operation, so it has beed taken
+ * out with a separate function. Caller must acquire hold AutovacuumLock before
+ * calling it.
+ */
+static inline void
+CVSleep(ConditionVariable *cv)
+{
+ ConditionVariablePrepareToSleep(cv);
+
+ LWLockRelease(AutovacuumLock);
+ ConditionVariableSleep(cv, PG_WAIT_IPC);
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ ConditionVariableCancelSleep();
+}
+
+/*
+ * This function used to synchronize leader with supportive workers during
+ * parallel index vacuuming. Each process will exit iff:
+ * Leader worker is ready to perform parallel vacuum &&
+ * All launched supportive workers are ready to perform parallel vacuum &&
+ * (Autovacuum launcher already launched all requested workers ||
+ * Autovacuum launcher cannot launch more workers)
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ *
+ * NOTE: Some workers may call this function when leader worker decided to shut
+ * down parallel vacuuming. In this case '-1' value will be returned.
+ */
+static int
+parallel_autovacuum_start_sync_point(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+ SyncType sync_type;
+ int num_participants;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+ Assert(workitem->active);
+ sync_type = workitem->sync_type;
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ Assert(workitem->leader_proc_num == MyProcPid);
+
+ /* Wake up all sleeping supportive workers, if required ... */
+ if (sync_type == LEADER)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * Advance status, because we are guaranteed to pass this
+ * sync point.
+ */
+ workitem->status = START_SYNC_POINT_PASSED;
+ }
+ /* ... otherwise, wait for somebody to wake us up */
+ else
+ {
+ workitem->leader_sleeping = true;
+ CVSleep(&workitem->cv);
+ workitem->leader_sleeping = false;
+
+ /*
+ * A priori, we believe that in the end everyone should be awakened
+ * by the leader.
+ */
+ workitem->sync_type = LEADER;
+ }
+ }
+ else
+ {
+ workitem->nworkers_participating += 1;
+
+ /*
+ * If we know, that launcher will no longer attempt to launch more
+ * supportive workers for this item => we are LAST_WORKER for sure.
+ *
+ * Note, that launcher set LAST_WORKER sync type without knowing
+ * current status of leader. So we also check that leader is sleeping
+ * before wake all up. Otherwise, we must wait for leader (and ask him
+ * to wake all up).
+ */
+ if (workitem->nworkers_participating == workitem->nworkers_to_launch &&
+ sync_type == LAST_WORKER && workitem->leader_sleeping)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * We must not advance status if leader wants to shut down parallel
+ * execution (see checks below).
+ */
+ if (workitem->status != SHUTDOWN)
+ workitem->status = START_SYNC_POINT_PASSED;
+ }
+ else
+ {
+ if (workitem->nworkers_participating == workitem->nworkers_to_launch &&
+ sync_type == LAST_WORKER)
+ {
+ workitem->sync_type = LEADER;
+ }
+
+ workitem->nworkers_sleeping += 1;
+ CVSleep(&workitem->cv);
+ workitem->nworkers_sleeping -= 1;
+ }
+ }
+
+ /* Tell caller that */
+ if (workitem->status == SHUTDOWN)
+ num_participants = -1;
+ else
+ num_participants = workitem->nworkers_participating;
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return num_participants;
+}
+
+/*
+ * Like function above, but must be called by leader and supportive workers
+ * when they finished parallel index vacuum.
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ */
+void
+ParallelAutovacuumEndSyncPoint(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+ Assert(workitem->active);
+
+ /* Nothing to do if no supportive workers were launched */
+ if (workitem->nworkers_participating == 0)
+ {
+ Assert(AmParallelIdxAutoVacLeader());
+ workitem->status = END_SYNC_POINT_PASSED;
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return;
+ }
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ Assert(workitem->sync_type == LEADER);
+
+ /* Wait for all workers to finish (only last worker will wake us up) */
+ if (workitem->nfinished != workitem->nworkers_participating)
+ {
+ workitem->sync_type = LAST_WORKER;
+ workitem->leader_sleeping = true;
+ CVSleep(&workitem->cv);
+ workitem->leader_sleeping = false;
+
+ Assert(workitem->nfinished == workitem->nworkers_participating);
+
+ /*
+ * Advance status, because we are guaranteed to pass this
+ * sync point.
+ */
+ workitem->status = END_SYNC_POINT_PASSED;
+ }
+ }
+ else
+ {
+ workitem->nfinished += 1;
+
+ /* If we are last finished worker - wake up the leader.
+ *
+ * If not - just leave, because supportive worker already finished all
+ * work and must die.
+ */
+ if (workitem->sync_type == LAST_WORKER &&
+ workitem->nfinished == workitem->nworkers_participating)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * Don't need to check SHUTDOWN status here - all supportive workers
+ * are about to finish anyway.
+ */
+ workitem->status = END_SYNC_POINT_PASSED;
+ }
+
+ /* We are not participate anymore */
+ MyWorkerInfo->wi_pcleanup = -1;
+ }
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return;
+}
+
+/*
+ * Get id of parallel index vacuum worker (counting from 0).
+ */
+int
+GetAutoVacuumParallelWorkerNumber(void)
+{
+ Assert(AmAutoVacuumWorkerProcess() && MyWorkerInfo->wi_pcleanup > 0);
+ return (MyWorkerInfo->wi_pcleanup - 1);
+}
+
+/*
+ * Leader autovacuum process can decide, that he needs several helper workers
+ * to process table in parallel mode. He must set up parallel context and call
+ * LaunchParallelAutovacuumWorkers.
+ *
+ * In this function we do following :
+ * 1) Send signal to autovacuum lancher that creates 'supportive workers'
+ * during launcher's standard work loop.
+ * 2) Wait for supportive workers to start.
+ *
+ * Funcition return number of workers that launcher was able to launch (may be
+ * less then 'nworkers_to_launch').
+ */
+int
+LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch,
+ dsm_handle handle, PGPROC *leader_proc,
+ int leader_proc_pid)
+{
+ int nworkers_launched = 0;
+ ParallelAutoVacuumWorkItem *workitem;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+
+ /*
+ * For now, there can be only one leader across all cluster.
+ * TODO: fix it in next versions
+ */
+ if (workitem->active && workitem->leader_proc_num != MyProcPid)
+ {
+ LWLockRelease(AutovacuumLock);
+ return 0;
+ }
+
+ /* OK, we can use this workitem entry. Init it. */
+ workitem->avw_database = MyDatabaseId;
+ workitem->avw_relation = rel_id;
+ workitem->handl = handle;
+ workitem->leader_proc = leader_proc;
+ workitem->leader_proc_num = leader_proc_pid;
+ workitem->nworkers_participating = 0;
+ workitem->nfinished = 0;
+ workitem->nworkers_to_launch = nworkers_to_launch;
+ workitem->active = true;
+ workitem->leader_sleeping = false;
+ workitem->nworkers_sleeping = 0;
+ workitem->nfinished = 0;
+ workitem->sync_type = LAUNCHER;
+ workitem->status = STARTUP;
+
+ LWLockRelease(AutovacuumLock);
+
+ /* Notify autovacuum launcher that we need supportive workers */
+ if (AutoVacParallelWorkRequest())
+ {
+ /* Become the leader */
+ MyWorkerInfo->wi_pcleanup = 0;
+
+ /* All created workers must get same locks as leader process */
+ BecomeLockGroupLeader();
+
+ /*
+ * Wait until all supprotive workers are launched. Also retrieve actual
+ * number of participants
+ */
+
+ nworkers_launched = parallel_autovacuum_start_sync_point(false);
+ }
+ else
+ {
+ /*
+ * If we (for any reason) cannot send signal to the launcher, don't try
+ * to do index vacuuming in parallel
+ */
+ return 0;
+ }
+
+ return nworkers_launched;
+}
+
+/*
+ * During parallel index vacuuming any worker (both supportives and leader) can
+ * catch an error.
+ * In order to handle it in the right way we must call this function.
+ */
+static void
+handle_parallel_idx_autovac_errors(void)
+{
+ ParallelAutoVacuumWorkItem *item;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ if (item->status == START_SYNC_POINT_PASSED)
+ {
+ /*
+ * If start sync point already passed - just wait for all supportive
+ * workers to finish and exit.
+ */
+ ParallelAutovacuumEndSyncPoint(true);
+ }
+ else if (item->status == STARTUP)
+ {
+ /*
+ * If no sync point are passed we can prevent supportive workers
+ * from performing their work - set SHUTDOWN status and wait while
+ * all workers will see it.
+ */
+ item->status = SHUTDOWN;
+ parallel_autovacuum_start_sync_point(true);
+ }
+
+ AutoVacuumReleaseParallelWork(true);
+ }
+ else
+ {
+ Assert(AmParallelIdxAutoVacSupportive());
+
+ if (item->status == STARTUP)
+ {
+ /*
+ * If no sync point are passed - just exclude ourselves from
+ * participants. Further parallel index vacuuming will take place
+ * as usual.
+ */
+ item->nworkers_to_launch -= 1;
+
+ if (item->nworkers_participating == item->nworkers_to_launch &&
+ item->sync_type == LAST_WORKER && item->leader_sleeping)
+ {
+ ConditionVariableBroadcast(&item->cv);
+ item->status = START_SYNC_POINT_PASSED;
+ }
+ }
+ else if (item->status == START_SYNC_POINT_PASSED)
+ {
+ /*
+ * If start sync point already passed we will simulate the usual
+ * end of work (see ParallelAutovacuumEndSyncPoint).
+ */
+ item->nfinished += 1;
+
+ if (item->sync_type == LAST_WORKER &&
+ item->nfinished == item->nworkers_participating)
+ {
+ ConditionVariableBroadcast(&item->cv);
+ item->status = END_SYNC_POINT_PASSED;
+ }
+ }
+ }
+
+ LWLockRelease(AutovacuumLock);
+}
+
/*
* autovac_init
* This is called at postmaster initialization.
@@ -3361,6 +4153,9 @@ AutoVacuumShmemInit(void)
AutoVacuumShmem->av_startingWorker = NULL;
memset(AutoVacuumShmem->av_workItems, 0,
sizeof(AutoVacuumWorkItem) * NUM_WORKITEMS);
+ memset(&AutoVacuumShmem->pav_workItem, 0,
+ sizeof(ParallelAutoVacuumWorkItem));
+ ConditionVariableInit(&AutoVacuumShmem->pav_workItem.cv);
worker = (WorkerInfo) ((char *) AutoVacuumShmem +
MAXALIGN(sizeof(AutoVacuumShmemStruct)));
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 60b12446a1c..c045a8d6eda 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3647,6 +3647,36 @@ struct config_int ConfigureNamesInt[] =
check_autovacuum_work_mem, NULL, NULL
},
+ {
+ {"max_parallel_index_autovac_workers", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the maximum number of parallel autovacuum worker processes during parallel index vacuuming of single table."),
+ NULL
+ },
+ &max_parallel_index_autovac_workers,
+ 0, 0, MAX_PARALLEL_WORKER_LIMIT,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"autovac_idx_parallel_min_rows", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the minimum number of dead tuples in single table that requires parallel index processing during autovacuum."),
+ NULL
+ },
+ &autovac_idx_parallel_min_rows,
+ 0, 0, INT32_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"autovac_idx_parallel_min_indexes", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the minimum number indexes created on single table that requires parallel index processing during autovacuum."),
+ NULL
+ },
+ &autovac_idx_parallel_min_indexes,
+ 2, 2, INT32_MAX,
+ NULL, NULL, NULL
+ },
+
{
{"tcp_keepalives_idle", PGC_USERSET, CONN_AUTH_TCP,
gettext_noop("Time between issuing TCP keepalives."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 34826d01380..08869398039 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -146,6 +146,12 @@
#hash_mem_multiplier = 2.0 # 1-1000.0 multiplier on hash table work_mem
#maintenance_work_mem = 64MB # min 64kB
#autovacuum_work_mem = -1 # min 64kB, or -1 to use maintenance_work_mem
+#max_parallel_index_autovac_workers = 0 # this feature disabled by default
+ # (change requires restart)
+#autovac_idx_parallel_min_rows = 0
+ # (change requires restart)
+#autovac_idx_parallel_min_indexes = 2
+ # (change requires restart)
#logical_decoding_work_mem = 64MB # min 64kB
#max_stack_depth = 2MB # min 100kB
#shared_memory_type = mmap # the default is the first option
diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h
index e8135f41a1c..81e6267cd7b 100644
--- a/src/include/postmaster/autovacuum.h
+++ b/src/include/postmaster/autovacuum.h
@@ -15,6 +15,8 @@
#define AUTOVACUUM_H
#include "storage/block.h"
+#include "storage/dsm_impl.h"
+#include "storage/lock.h"
/*
* Other processes can request specific work from autovacuum, identified by
@@ -25,12 +27,25 @@ typedef enum
AVW_BRINSummarizeRange,
} AutoVacuumWorkItemType;
+/*
+ * Magic number for parallel context TOC. Used for parallel index processing
+ * during autovacuum.
+ */
+#define AV_PARALLEL_MAGIC 0xaaaaaaaa
+
+/* Magic numbers for per-context parallel index processing state sharing. */
+#define AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFF0000000000001)
+#define AV_PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFF0000000000002)
+
/* GUC variables */
extern PGDLLIMPORT bool autovacuum_start_daemon;
extern PGDLLIMPORT int autovacuum_worker_slots;
extern PGDLLIMPORT int autovacuum_max_workers;
extern PGDLLIMPORT int autovacuum_work_mem;
+extern PGDLLIMPORT int max_parallel_index_autovac_workers;
+extern PGDLLIMPORT int autovac_idx_parallel_min_rows;
+extern PGDLLIMPORT int autovac_idx_parallel_min_indexes;
extern PGDLLIMPORT int autovacuum_naptime;
extern PGDLLIMPORT int autovacuum_vac_thresh;
extern PGDLLIMPORT int autovacuum_vac_max_thresh;
@@ -60,10 +75,20 @@ extern void AutoVacWorkerFailed(void);
pg_noreturn extern void AutoVacLauncherMain(const void *startup_data, size_t startup_data_len);
pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t startup_data_len);
+/* called from autovac worker when it needs participants in parallel index cleanup */
+extern bool AutoVacParallelWorkRequest(void);
extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type,
Oid relationId, BlockNumber blkno);
+extern void AutoVacuumReleaseParallelWork(bool keep_lock);
+extern int AutoVacuumParallelWorkWaitForStart(void);
+extern void ParallelAutovacuumEndSyncPoint( bool keep_lock);
+extern int GetAutoVacuumParallelWorkerNumber(void);
+extern int LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch,
+ dsm_handle handle,
+ PGPROC *leader_proc,
+ int leader_proc_pid);
/* shared memory stuff */
extern Size AutoVacuumShmemSize(void);
extern void AutoVacuumShmemInit(void);
diff --git a/src/test/modules/autovacuum/.gitignore b/src/test/modules/autovacuum/.gitignore
new file mode 100644
index 00000000000..0b54641bceb
--- /dev/null
+++ b/src/test/modules/autovacuum/.gitignore
@@ -0,0 +1 @@
+/tmp_check/
\ No newline at end of file
diff --git a/src/test/modules/autovacuum/Makefile b/src/test/modules/autovacuum/Makefile
new file mode 100644
index 00000000000..90c00ff350b
--- /dev/null
+++ b/src/test/modules/autovacuum/Makefile
@@ -0,0 +1,14 @@
+# src/test/modules/autovacuum/Makefile
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/autovacuum
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
\ No newline at end of file
diff --git a/src/test/modules/autovacuum/t/001_autovac_parallel.pl b/src/test/modules/autovacuum/t/001_autovac_parallel.pl
new file mode 100644
index 00000000000..d8e22a06bac
--- /dev/null
+++ b/src/test/modules/autovacuum/t/001_autovac_parallel.pl
@@ -0,0 +1,137 @@
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $psql_out;
+
+my $node = PostgreSQL::Test::Cluster->new('node1');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+ autovacuum = off
+ max_wal_size = 4096
+});
+$node->start;
+
+my $indexes_num = 80;
+my $initial_rows_num = 1_000_000;
+
+# Create big table and create specified number of b-tree indexes on it
+$node->safe_psql('postgres', qq{
+ CREATE TABLE test_autovac (
+ id SERIAL PRIMARY KEY,
+ col_1 INTEGER, col_2 INTEGER, col_3 INTEGER, col_4 INTEGER, col_5 INTEGER,
+ col_6 INTEGER, col_7 INTEGER, col_8 INTEGER, col_9 INTEGER, col_10 INTEGER,
+ col_11 INTEGER, col_12 INTEGER, col_13 INTEGER, col_14 INTEGER, col_15 INTEGER,
+ col_16 INTEGER, col_17 INTEGER, col_18 INTEGER, col_19 INTEGER, col_20 INTEGER,
+ col_21 INTEGER, col_22 INTEGER, col_23 INTEGER, col_24 INTEGER, col_25 INTEGER,
+ col_26 INTEGER, col_27 INTEGER, col_28 INTEGER, col_29 INTEGER, col_30 INTEGER,
+ col_31 INTEGER, col_32 INTEGER, col_33 INTEGER, col_34 INTEGER, col_35 INTEGER,
+ col_36 INTEGER, col_37 INTEGER, col_38 INTEGER, col_39 INTEGER, col_40 INTEGER,
+ col_41 INTEGER, col_42 INTEGER, col_43 INTEGER, col_44 INTEGER, col_45 INTEGER,
+ col_46 INTEGER, col_47 INTEGER, col_48 INTEGER, col_49 INTEGER, col_50 INTEGER,
+ col_51 INTEGER, col_52 INTEGER, col_53 INTEGER, col_54 INTEGER, col_55 INTEGER,
+ col_56 INTEGER, col_57 INTEGER, col_58 INTEGER, col_59 INTEGER, col_60 INTEGER,
+ col_61 INTEGER, col_62 INTEGER, col_63 INTEGER, col_64 INTEGER, col_65 INTEGER,
+ col_66 INTEGER, col_67 INTEGER, col_68 INTEGER, col_69 INTEGER, col_70 INTEGER,
+ col_71 INTEGER, col_72 INTEGER, col_73 INTEGER, col_74 INTEGER, col_75 INTEGER,
+ col_76 INTEGER, col_77 INTEGER, col_78 INTEGER, col_79 INTEGER, col_80 INTEGER,
+ col_81 INTEGER, col_82 INTEGER, col_83 INTEGER, col_84 INTEGER, col_85 INTEGER,
+ col_86 INTEGER, col_87 INTEGER, col_88 INTEGER, col_89 INTEGER, col_90 INTEGER,
+ col_91 INTEGER, col_92 INTEGER, col_93 INTEGER, col_94 INTEGER, col_95 INTEGER,
+ col_96 INTEGER, col_97 INTEGER, col_98 INTEGER, col_99 INTEGER, col_100 INTEGER
+ );
+
+ DO \$\$
+ DECLARE
+ i INTEGER;
+ BEGIN
+ FOR i IN 1..$indexes_num LOOP
+ EXECUTE format('CREATE INDEX idx_col_\%s ON test_autovac (col_\%s);', i, i);
+ END LOOP;
+ END \$\$;
+});
+
+$node->psql('postgres',
+ "SELECT COUNT(*) FROM pg_index i
+ JOIN pg_class c ON c.oid = i.indrelid
+ WHERE c.relname = 'test_autovac';",
+ stdout => \$psql_out
+);
+is($psql_out, $indexes_num + 1, "All indexes created successfully");
+
+$node->safe_psql('postgres', qq{
+ DO \$\$
+ DECLARE
+ i INTEGER;
+ BEGIN
+ FOR i IN 1..$initial_rows_num LOOP
+ INSERT INTO test_autovac (
+ col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10,
+ col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20,
+ col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30,
+ col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40,
+ col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49, col_50,
+ col_51, col_52, col_53, col_54, col_55, col_56, col_57, col_58, col_59, col_60,
+ col_61, col_62, col_63, col_64, col_65, col_66, col_67, col_68, col_69, col_70,
+ col_71, col_72, col_73, col_74, col_75, col_76, col_77, col_78, col_79, col_80,
+ col_81, col_82, col_83, col_84, col_85, col_86, col_87, col_88, col_89, col_90,
+ col_91, col_92, col_93, col_94, col_95, col_96, col_97, col_98, col_99, col_100
+ ) VALUES (
+ i, i + 1, i + 2, i + 3, i + 4, i + 5, i + 6, i + 7, i + 8, i + 9,
+ i + 10, i + 11, i + 12, i + 13, i + 14, i + 15, i + 16, i + 17, i + 18, i + 19,
+ i + 20, i + 21, i + 22, i + 23, i + 24, i + 25, i + 26, i + 27, i + 28, i + 29,
+ i + 30, i + 31, i + 32, i + 33, i + 34, i + 35, i + 36, i + 37, i + 38, i + 39,
+ i + 40, i + 41, i + 42, i + 43, i + 44, i + 45, i + 46, i + 47, i + 48, i + 49,
+ i + 50, i + 51, i + 52, i + 53, i + 54, i + 55, i + 56, i + 57, i + 58, i + 59,
+ i + 60, i + 61, i + 62, i + 63, i + 64, i + 65, i + 66, i + 67, i + 68, i + 69,
+ i + 70, i + 71, i + 72, i + 73, i + 74, i + 75, i + 76, i + 77, i + 78, i + 79,
+ i + 80, i + 81, i + 82, i + 83, i + 84, i + 85, i + 86, i + 87, i + 88, i + 89,
+ i + 90, i + 91, i + 92, i + 93, i + 94, i + 95, i + 96, i + 97, i + 98, i + 99
+ );
+ END LOOP;
+ END \$\$;
+});
+
+$node->psql('postgres',
+ "SELECT COUNT(*) FROM test_autovac;",
+ stdout => \$psql_out
+);
+is($psql_out, $initial_rows_num, "All data inserted into table successfully");
+
+$node->safe_psql('postgres', qq{
+ UPDATE test_autovac SET col_1 = 0 WHERE (col_1 % 3) = 0;
+ ANALYZE test_autovac;
+});
+
+my $dead_tuples_thresh = $initial_rows_num / 4;
+my $indexes_num_thresh = $indexes_num / 2;
+my $num_workers = 1;
+
+# Reduce autovacuum_work_mem, so leader process will perform parallel indexi
+# vacuum phase several times
+$node->append_conf('postgresql.conf', qq{
+ autovacuum_naptime = '1s'
+ autovacuum_work_mem = 2048
+ autovacuum_vacuum_threshold = 1
+ autovacuum_analyze_threshold = 1
+ autovacuum_vacuum_scale_factor = 0.1
+ autovacuum_analyze_scale_factor = 0.1
+ autovacuum_max_workers = 10
+ autovacuum = on
+ autovac_idx_parallel_min_rows = $dead_tuples_thresh
+ autovac_idx_parallel_min_indexes = $indexes_num_thresh
+ max_parallel_index_autovac_workers = $num_workers
+});
+
+$node->restart;
+
+# wait for autovacuum to reset datfrozenxid age to 0
+$node->poll_query_until('postgres', q{
+ SELECT count(*) = 0 FROM pg_database WHERE mxid_age(datfrozenxid) > 0
+}) or die "Timed out while waiting for autovacuum";
+
+ok(1, "There are no segfaults");
+
+$node->stop;
+done_testing();
--
2.43.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: POC: Parallel processing of indexes in autovacuum
In-Reply-To: <CACG=ezZOrNsuLoETLD1gAswZMuH2nGGq7Ogcc0QOE5hhWaw=cw@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox