public inbox for [email protected]
help / color / mirror / Atom feedFrom: Daniil Davydov <[email protected]>
To: Sami Imseih <[email protected]>
Cc: Masahiko Sawada <[email protected]>
Cc: Maxim Orlov <[email protected]>
Cc: Postgres hackers <[email protected]>
Subject: Re: POC: Parallel processing of indexes in autovacuum
Date: Sat, 3 May 2025 01:49:58 +0700
Message-ID: <CAJDiXgigcF3CMY86oREdQvxUDaUDFihkK9f78rdEyLTLeB0hdA@mail.gmail.com> (raw)
In-Reply-To: <CAA5RZ0vN_RjrHR+HXTkfHydRDZ-yGrpapWQ3-oGj1W34AoftmQ@mail.gmail.com>
References: <CACG=ezZOrNsuLoETLD1gAswZMuH2nGGq7Ogcc0QOE5hhWaw=cw@mail.gmail.com>
<CAD21AoCdx5ZNS_cO7bYz1Zfb+Kw1kuJV2wtewrz7T1pPpjcWGw@mail.gmail.com>
<CAA5RZ0vN_RjrHR+HXTkfHydRDZ-yGrpapWQ3-oGj1W34AoftmQ@mail.gmail.com>
On Fri, May 2, 2025 at 11:58 PM Sami Imseih <[email protected]> wrote:
>
> I am generally -1 on the idea of autovacuum performing parallel
> index vacuum, because I always felt that the parallel option should
> be employed in a targeted manner for a specific table. if you have a bunch
> of large tables, some more important than others, a/c may end
> up using parallel resources on the least important tables and you
> will have to adjust a/v settings per table, etc to get the right table
> to be parallel index vacuumed by a/v.
Hm, this is a good point. I think I should clarify one moment - in
practice, there is a common situation when users have one huge table
among all databases (with 80+ indexes created on it). But, of course,
in general there may be few such tables.
But we can still adjust the autovac_idx_parallel_min_rows parameter.
If a table has a lot of dead tuples => it is actively used => table is
important (?).
Also, if the user can really determine the "importance" of each of the
tables - we can provide an appropriate table option. Tables with this
option set will be processed in parallel in priority order. What do
you think about such an idea?
>
> Also, with the TIDStore improvements for index cleanup, and the practical
> elimination of multi-pass index vacuums, I see this being even less
> convincing as something to add to a/v.
If I understood correctly, then we are talking about the fact that
TIDStore can store so many tuples that in fact a second pass is never
needed.
But the number of passes does not affect the presented optimization in
any way. We must think about a large number of indexes that must be
processed. Even within a single pass we can have a 40% increase in
speed.
>
> Now, If I am going to allocate extra workers to run vacuum in parallel, why
> not just provide more autovacuum workers instead so I can get more tables
> vacuumed within a span of time?
For now, only one process can clean up indexes, so I don't see how
increasing the number of a/v workers will help in the situation that I
mentioned above.
Also, we don't consume additional resources during autovacuum in this
patch - total number of a/v workers always <= autovacuum_max_workers.
BTW, see v2 patch, attached to this letter (bug fixes) :-)
--
Best regards,
Daniil Davydov
Attachments:
[text/x-patch] v2-0001-WIP-Allow-autovacuum-to-process-indexes-of-single.patch (61.8K, 2-v2-0001-WIP-Allow-autovacuum-to-process-indexes-of-single.patch)
download | inline diff:
From 1c93a729b844a1dfe109e8d9e54d5cc0a941d061 Mon Sep 17 00:00:00 2001
From: Daniil Davidov <[email protected]>
Date: Sat, 3 May 2025 00:27:45 +0700
Subject: [PATCH v2] WIP Allow autovacuum to process indexes of single table in
parallel
---
src/backend/commands/vacuum.c | 27 +
src/backend/commands/vacuumparallel.c | 289 +++++-
src/backend/postmaster/autovacuum.c | 906 +++++++++++++++++-
src/backend/utils/misc/guc_tables.c | 30 +
src/backend/utils/misc/postgresql.conf.sample | 6 +
src/include/postmaster/autovacuum.h | 23 +
src/test/modules/autovacuum/.gitignore | 1 +
src/test/modules/autovacuum/Makefile | 14 +
.../autovacuum/t/001_autovac_parallel.pl | 137 +++
9 files changed, 1387 insertions(+), 46 deletions(-)
create mode 100644 src/test/modules/autovacuum/.gitignore
create mode 100644 src/test/modules/autovacuum/Makefile
create mode 100644 src/test/modules/autovacuum/t/001_autovac_parallel.pl
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 33a33bf6b1c..a5ef5319ccc 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -2234,6 +2234,33 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
else
toast_relid = InvalidOid;
+ /*
+ * Decide whether we need to process table with given oid in parallel mode
+ * during autovacuum.
+ */
+ if (AmAutoVacuumWorkerProcess() &&
+ params->index_cleanup != VACOPTVALUE_DISABLED)
+ {
+ PgStat_StatTabEntry *tabentry;
+
+ /* fetch the pgstat table entry */
+ tabentry = pgstat_fetch_stat_tabentry_ext(rel->rd_rel->relisshared,
+ rel->rd_id);
+ if (tabentry && tabentry->dead_tuples >= autovac_idx_parallel_min_rows)
+ {
+ List *indexes = RelationGetIndexList(rel);
+ int num_indexes = list_length(indexes);
+
+ list_free(indexes);
+
+ if (num_indexes >= autovac_idx_parallel_min_indexes &&
+ max_parallel_index_autovac_workers > 0)
+ {
+ params->nworkers = max_parallel_index_autovac_workers;
+ }
+ }
+ }
+
/*
* Switch to the table owner's userid, so that any index functions are run
* as that user. Also lock down security-restricted operations and
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 2b9d548cdeb..cb4b7c23010 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -1,20 +1,23 @@
/*-------------------------------------------------------------------------
*
* vacuumparallel.c
- * Support routines for parallel vacuum execution.
+ * Support routines for parallel [auto]vacuum execution.
*
* This file contains routines that are intended to support setting up, using,
* and tearing down a ParallelVacuumState.
*
- * In a parallel vacuum, we perform both index bulk deletion and index cleanup
- * with parallel worker processes. Individual indexes are processed by one
- * vacuum process. ParallelVacuumState contains shared information as well as
- * the memory space for storing dead items allocated in the DSA area. We
+ * In a parallel [auto]vacuum, we perform both index bulk deletion and index
+ * cleanup with parallel worker processes. Individual indexes are processed by
+ * one vacuum process. ParallelVacuumState contains shared information as well
+ * as the memory space for storing dead items allocated in the DSA area. We
* launch parallel worker processes at the start of parallel index
* bulk-deletion and index cleanup and once all indexes are processed, the
* parallel worker processes exit. Each time we process indexes in parallel,
* the parallel context is re-initialized so that the same DSM can be used for
- * multiple passes of index bulk-deletion and index cleanup.
+ * multiple passes of index bulk-deletion and index cleanup. For maintenance
+ * vacuum, we launch workers manually (using dynamic bgworkers machinery), and
+ * for autovacuum we send signals to the autovacuum launcher (all logic for
+ * communication among parallel autovacuum processes is in autovacuum.c).
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -34,9 +37,11 @@
#include "executor/instrument.h"
#include "optimizer/paths.h"
#include "pgstat.h"
+#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
+#include "utils/memutils.h"
#include "utils/rel.h"
/*
@@ -157,11 +162,20 @@ typedef struct PVIndStats
} PVIndStats;
/*
- * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h.
+ * Struct for maintaining a parallel [auto]vacuum state. typedef appears in
+ * vacuum.h.
*/
struct ParallelVacuumState
{
- /* NULL for worker processes */
+ /* Is this structure used for maintenance vacuum or autovacuum */
+ bool is_autovacuum;
+
+ /*
+ * NULL for worker processes.
+ *
+ * NOTE: Parallel autovacuum only needs a subset of the maintenance vacuum
+ * functionality.
+ */
ParallelContext *pcxt;
/* Parent Heap Relation */
@@ -221,6 +235,10 @@ struct ParallelVacuumState
PVIndVacStatus status;
};
+static ParallelContext *CreateParallelAutoVacContext(int nworkers);
+static void InitializeParallelAutoVacDSM(ParallelContext *pcxt);
+static void DestroyParallelAutoVacContext(ParallelContext *pcxt);
+
static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
bool *will_parallel_vacuum);
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
@@ -280,15 +298,21 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
}
pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
+ pvs->is_autovacuum = AmAutoVacuumWorkerProcess();
pvs->indrels = indrels;
pvs->nindexes = nindexes;
pvs->will_parallel_vacuum = will_parallel_vacuum;
pvs->bstrategy = bstrategy;
pvs->heaprel = rel;
- EnterParallelMode();
- pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
- parallel_workers);
+ if (pvs->is_autovacuum)
+ pcxt = CreateParallelAutoVacContext(parallel_workers);
+ else
+ {
+ EnterParallelMode();
+ pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
+ parallel_workers);
+ }
Assert(pcxt->nworkers > 0);
pvs->pcxt = pcxt;
@@ -327,7 +351,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
else
querylen = 0; /* keep compiler quiet */
- InitializeParallelDSM(pcxt);
+ if (pvs->is_autovacuum)
+ InitializeParallelAutoVacDSM(pvs->pcxt);
+ else
+ InitializeParallelDSM(pcxt);
/* Prepare index vacuum stats */
indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len);
@@ -371,11 +398,18 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
shared->relid = RelationGetRelid(rel);
shared->elevel = elevel;
shared->queryid = pgstat_get_my_query_id();
- shared->maintenance_work_mem_worker =
- (nindexes_mwm > 0) ?
- maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
- maintenance_work_mem;
- shared->dead_items_info.max_bytes = vac_work_mem * (size_t) 1024;
+
+ if (pvs->is_autovacuum)
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ autovacuum_work_mem / Min(parallel_workers, nindexes_mwm) :
+ autovacuum_work_mem;
+ else
+ shared->maintenance_work_mem_worker =
+ (nindexes_mwm > 0) ?
+ maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
+ maintenance_work_mem;
+ shared->dead_items_info.max_bytes = vac_work_mem * 1024L;
/* Prepare DSA space for dead items */
dead_items = TidStoreCreateShared(shared->dead_items_info.max_bytes,
@@ -453,8 +487,13 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
TidStoreDestroy(pvs->dead_items);
- DestroyParallelContext(pvs->pcxt);
- ExitParallelMode();
+ if (pvs->is_autovacuum)
+ DestroyParallelAutoVacContext(pvs->pcxt);
+ else
+ {
+ DestroyParallelContext((ParallelContext *) pvs->pcxt);
+ ExitParallelMode();
+ }
pfree(pvs->will_parallel_vacuum);
pfree(pvs);
@@ -532,6 +571,144 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
parallel_vacuum_process_all_indexes(pvs, num_index_scans, false);
}
+/*
+ * Short version of CreateParallelContext (parallel.c). Here we init only those
+ * fields that are needed for parallel index processing during autovacuum.
+ */
+static ParallelContext *
+CreateParallelAutoVacContext(int nworkers)
+{
+ ParallelContext *pcxt;
+ MemoryContext oldcontext;
+
+ Assert(AmAutoVacuumWorkerProcess());
+
+ /* Number of workers should be non-negative. */
+ Assert(nworkers >= 0);
+
+ /* We might be running in a short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* Initialize a new ParallelContext. */
+ pcxt = palloc0(sizeof(ParallelContext));
+ pcxt->nworkers = nworkers;
+ pcxt->nworkers_to_launch = nworkers;
+ shm_toc_initialize_estimator(&pcxt->estimator);
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+
+ return pcxt;
+}
+
+/*
+ * Short version of InitializeParallelDSM (parallel.c). Here we put into dsm
+ * only those data that are needed for parallel index processing during
+ * autovacuum.
+ */
+static void
+InitializeParallelAutoVacDSM(ParallelContext *pcxt)
+{
+ MemoryContext oldcontext;
+ Size tsnaplen = 0;
+ Size asnaplen = 0;
+ Size segsize = 0;
+ char *tsnapspace;
+ char *asnapspace;
+ Snapshot transaction_snapshot = GetTransactionSnapshot();
+ Snapshot active_snapshot = GetActiveSnapshot();
+
+ Assert(pcxt->nworkers >= 1);
+
+ /* We might be running in a very short-lived memory context. */
+ oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+ if (IsolationUsesXactSnapshot())
+ {
+ tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+ asnaplen = EstimateSnapshotSpace(active_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+
+ /* Create DSM and initialize with new table of contents. */
+ segsize = shm_toc_estimate(&pcxt->estimator);
+ pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+
+ if (pcxt->seg == NULL)
+ {
+ pcxt->nworkers = 0;
+ pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
+ }
+
+ pcxt->toc = shm_toc_create(AV_PARALLEL_MAGIC,
+ pcxt->seg == NULL ? pcxt->private_memory :
+ dsm_segment_address(pcxt->seg),
+ segsize);
+
+ /* We can skip the rest of this if we're not budgeting for any workers. */
+ if (pcxt->nworkers > 0)
+ {
+ /*
+ * Serialize the transaction snapshot if the transaction isolation
+ * level uses a transaction snapshot.
+ */
+ if (IsolationUsesXactSnapshot())
+ {
+ tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+ SerializeSnapshot(transaction_snapshot, tsnapspace);
+ shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT,
+ tsnapspace);
+ }
+
+ /* Serialize the active snapshot. */
+ asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+ SerializeSnapshot(active_snapshot, asnapspace);
+ shm_toc_insert(pcxt->toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+ }
+
+ /* Update nworkers_to_launch, in case we changed nworkers above. */
+ pcxt->nworkers_to_launch = pcxt->nworkers;
+
+ /* Restore previous memory context. */
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Short version of DestroyParallelContext (parallel.c). Here we clean up only
+ * those data that were used during parallel index processing during autovacuum.
+ */
+static void
+DestroyParallelAutoVacContext(ParallelContext *pcxt)
+{
+ /*
+ * If we have allocated a shared memory segment, detach it. This will
+ * implicitly detach the error queues, and any other shared memory queues,
+ * stored there.
+ */
+ if (pcxt->seg != NULL)
+ {
+ dsm_detach(pcxt->seg);
+ pcxt->seg = NULL;
+ }
+
+ /*
+ * If this parallel context is actually in backend-private memory rather
+ * than shared memory, free that memory instead.
+ */
+ if (pcxt->private_memory != NULL)
+ {
+ pfree(pcxt->private_memory);
+ pcxt->private_memory = NULL;
+ }
+
+ AutoVacuumReleaseParallelWork(false);
+ pfree(pcxt);
+}
+
/*
* Compute the number of parallel worker processes to request. Both index
* vacuum and index cleanup can be executed with parallel workers.
@@ -558,7 +735,9 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
* We don't allow performing parallel operation in standalone backend or
* when parallelism is disabled.
*/
- if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
+ if (!IsUnderPostmaster ||
+ (max_parallel_maintenance_workers == 0 && !AmAutoVacuumWorkerProcess()) ||
+ (max_parallel_index_autovac_workers == 0 && AmAutoVacuumWorkerProcess()))
return 0;
/*
@@ -597,15 +776,17 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
parallel_workers = (nrequested > 0) ?
Min(nrequested, nindexes_parallel) : nindexes_parallel;
- /* Cap by max_parallel_maintenance_workers */
- parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+ /* Cap by GUC variable */
+ parallel_workers = AmAutoVacuumWorkerProcess() ?
+ Min(parallel_workers, max_parallel_index_autovac_workers) :
+ Min(parallel_workers, max_parallel_maintenance_workers);
return parallel_workers;
}
/*
* Perform index vacuum or index cleanup with parallel workers. This function
- * must be used by the parallel vacuum leader process.
+ * must be used by the parallel [auto]vacuum leader process.
*/
static void
parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
@@ -670,7 +851,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
if (nworkers > 0)
{
/* Reinitialize parallel context to relaunch parallel workers */
- if (num_index_scans > 0)
+ if (num_index_scans > 0 && !pvs->is_autovacuum)
ReinitializeParallelDSM(pvs->pcxt);
/*
@@ -686,9 +867,22 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
* The number of workers can vary between bulkdelete and cleanup
* phase.
*/
- ReinitializeParallelWorkers(pvs->pcxt, nworkers);
-
- LaunchParallelWorkers(pvs->pcxt);
+ if (pvs->is_autovacuum)
+ {
+ pvs->pcxt->nworkers_to_launch = Min(pvs->pcxt->nworkers, nworkers);
+ if (pvs->pcxt->nworkers > 0 && pvs->pcxt->nworkers_to_launch > 0)
+ {
+ pvs->pcxt->nworkers_launched =
+ LaunchParallelAutovacuumWorkers(pvs->heaprel->rd_id,
+ pvs->pcxt->nworkers_to_launch,
+ dsm_segment_handle(pvs->pcxt->seg));
+ }
+ }
+ else
+ {
+ ReinitializeParallelWorkers(pvs->pcxt, nworkers);
+ LaunchParallelWorkers(pvs->pcxt);
+ }
if (pvs->pcxt->nworkers_launched > 0)
{
@@ -733,8 +927,14 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
*/
if (nworkers > 0)
{
- /* Wait for all vacuum workers to finish */
- WaitForParallelWorkersToFinish(pvs->pcxt);
+ /*
+ * Wait for all [auto]vacuum workers involved in parallel index
+ * processing (if any) to finish and advance state machine.
+ */
+ if (pvs->is_autovacuum && pvs->pcxt->nworkers_launched >= 0)
+ ParallelAutovacuumEndSyncPoint(false);
+ else if (!pvs->is_autovacuum)
+ WaitForParallelWorkersToFinish(pvs->pcxt);
for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
@@ -982,8 +1182,8 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
/*
* Perform work within a launched parallel process.
*
- * Since parallel vacuum workers perform only index vacuum or index cleanup,
- * we don't need to report progress information.
+ * Since parallel [auto]vacuum workers perform only index vacuum or index
+ * cleanup, we don't need to report progress information.
*/
void
parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
@@ -997,23 +1197,22 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
BufferUsage *buffer_usage;
WalUsage *wal_usage;
int nindexes;
+ int worker_number;
char *sharedquery;
ErrorContextCallback errcallback;
- /*
- * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we
- * don't support parallel vacuum for autovacuum as of now.
- */
- Assert(MyProc->statusFlags == PROC_IN_VACUUM);
-
- elog(DEBUG1, "starting parallel vacuum worker");
+ Assert(MyProc->statusFlags == PROC_IN_VACUUM || AmAutoVacuumWorkerProcess());
+ elog(DEBUG1, "starting parallel [auto]vacuum worker");
shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false);
/* Set debug_query_string for individual workers */
- sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
- debug_query_string = sharedquery;
- pgstat_report_activity(STATE_RUNNING, debug_query_string);
+ if (!AmAutoVacuumWorkerProcess())
+ {
+ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
+ debug_query_string = sharedquery;
+ pgstat_report_activity(STATE_RUNNING, debug_query_string);
+ }
/* Track query ID */
pgstat_report_query_id(shared->queryid, false);
@@ -1091,8 +1290,12 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
/* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
- &wal_usage[ParallelWorkerNumber]);
+
+ worker_number = AmAutoVacuumWorkerProcess() ?
+ GetAutoVacuumParallelWorkerNumber() : ParallelWorkerNumber;
+
+ InstrEndParallelQuery(&buffer_usage[worker_number],
+ &wal_usage[worker_number]);
/* Report any remaining cost-based vacuum delay time */
if (track_cost_delay_timing)
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 16756152b71..cb9c9f374bb 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -90,6 +90,7 @@
#include "postmaster/postmaster.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lmgr.h"
@@ -102,6 +103,7 @@
#include "utils/fmgrprotos.h"
#include "utils/guc_hooks.h"
#include "utils/injection_point.h"
+#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -129,6 +131,9 @@ int autovacuum_anl_thresh;
double autovacuum_anl_scale;
int autovacuum_freeze_max_age;
int autovacuum_multixact_freeze_max_age;
+int max_parallel_index_autovac_workers;
+int autovac_idx_parallel_min_rows;
+int autovac_idx_parallel_min_indexes;
double autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
@@ -164,6 +169,14 @@ static int default_freeze_table_age;
static int default_multixact_freeze_min_age;
static int default_multixact_freeze_table_age;
+/*
+ * Number of additional workers that was requested for parallel index processing
+ * during autovacuum.
+ */
+static int nworkers_for_idx_autovac = 0;
+
+static int nworkers_launched = 0;
+
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
@@ -222,6 +235,8 @@ typedef struct autovac_table
* wi_proc pointer to PGPROC of the running worker, NULL if not started
* wi_launchtime Time at which this worker was launched
* wi_dobalance Whether this worker should be included in balance calculations
+ * wi_pcleanup if (> 0) => this worker must participate in parallel index
+ * vacuuming as supportive . Must be (== 0) for leader worker.
*
* All fields are protected by AutovacuumLock, except for wi_tableoid and
* wi_sharedrel which are protected by AutovacuumScheduleLock (note these
@@ -237,10 +252,17 @@ typedef struct WorkerInfoData
TimestampTz wi_launchtime;
pg_atomic_flag wi_dobalance;
bool wi_sharedrel;
+ int wi_pcleanup;
} WorkerInfoData;
typedef struct WorkerInfoData *WorkerInfo;
+#define AmParallelIdxAutoVacSupportive() \
+ (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup > 0)
+
+#define AmParallelIdxAutoVacLeader() \
+ (MyWorkerInfo != NULL && MyWorkerInfo->wi_pcleanup == 0)
+
/*
* Possible signals received by the launcher from remote processes. These are
* stored atomically in shared memory so that other processes can set them
@@ -250,9 +272,11 @@ typedef enum
{
AutoVacForkFailed, /* failed trying to start a worker */
AutoVacRebalance, /* rebalance the cost limits */
+ AutoVacParallelReq, /* request for parallel index vacuum */
+ AutoVacNumSignals, /* must be last */
} AutoVacuumSignal;
-#define AutoVacNumSignals (AutoVacRebalance + 1)
+#define AutoVacNumSignals (AutoVacParallelReq + 1)
/*
* Autovacuum workitem array, stored in AutoVacuumShmem->av_workItems. This
@@ -272,6 +296,50 @@ typedef struct AutoVacuumWorkItem
#define NUM_WORKITEMS 256
+typedef enum
+{
+ LAUNCHER = 0, /* autovacuum launcher must wake everyone up */
+ LEADER, /* leader must wake everyone up */
+ LAST_WORKER, /* the last inited supportive worker must wake everyone
+ up */
+} SyncType;
+
+typedef enum
+{
+ STARTUP = 0, /* initial value - no sync points were passed */
+ START_SYNC_POINT_PASSED, /* start_sync_point was passed */
+ END_SYNC_POINT_PASSED, /* end_sync_point was passed */
+ SHUTDOWN, /* leader wants to shut down parallel index
+ vacuum due to occured error */
+} Status;
+
+/*
+ * Structure, stored in AutoVacuumShmem->pav_workItem. This is used for managing
+ * parallel index processing (whithin single table).
+ */
+typedef struct ParallelAutoVacuumWorkItem
+{
+ Oid avw_database;
+ Oid avw_relation;
+ int nworkers_participating;
+ int nworkers_to_launch;
+ int nworkers_sleeping; /* leader doesn't count */
+ int nfinished; /* # of workers, that already finished parallel
+ index processing (and probably already dead) */
+
+ dsm_handle handl;
+ int leader_proc_pid;
+
+ PGPROC *leader_proc;
+ ConditionVariable cv;
+
+ bool active; /* being processed */
+ bool leader_sleeping_on_ssp; /* sleeping on start sync point */
+ bool leader_sleeping_on_esp; /* sleeping on end sync point */
+ SyncType sync_type;
+ Status status;
+} ParallelAutoVacuumWorkItem;
+
/*-------------
* The main autovacuum shmem struct. On shared memory we store this main
* struct and the array of WorkerInfo structs. This struct keeps:
@@ -283,6 +351,8 @@ typedef struct AutoVacuumWorkItem
* av_startingWorker pointer to WorkerInfo currently being started (cleared by
* the worker itself as soon as it's up and running)
* av_workItems work item array
+ * pav_workItem information needed for parallel index processing whithing
+ * single table
* av_nworkersForBalance the number of autovacuum workers to use when
* calculating the per worker cost limit
*
@@ -298,6 +368,7 @@ typedef struct
dlist_head av_runningWorkers;
WorkerInfo av_startingWorker;
AutoVacuumWorkItem av_workItems[NUM_WORKITEMS];
+ ParallelAutoVacuumWorkItem pav_workItem;
pg_atomic_uint32 av_nworkersForBalance;
} AutoVacuumShmemStruct;
@@ -322,11 +393,17 @@ pg_noreturn static void AutoVacLauncherShutdown(void);
static void launcher_determine_sleep(bool canlaunch, bool recursing,
struct timeval *nap);
static void launch_worker(TimestampTz now);
+static void launch_worker_for_pcleanup(TimestampTz now);
+static void eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item,
+ bool all_launched);
static List *get_database_list(void);
static void rebuild_database_list(Oid newdb);
static int db_comparator(const void *a, const void *b);
static void autovac_recalculate_workers_for_balance(void);
+static int parallel_autovacuum_start_sync_point(bool keep_lock);
+static void handle_parallel_idx_autovac_errors(void);
+
static void do_autovacuum(void);
static void FreeWorkerInfo(int code, Datum arg);
@@ -355,6 +432,10 @@ static void avl_sigusr2_handler(SIGNAL_ARGS);
static bool av_worker_available(void);
static void check_av_worker_gucs(void);
+typedef bool (*wakeup_condition) (ParallelAutoVacuumWorkItem *item);
+static bool start_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item);
+static bool end_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item);
+static void CVSleep(ParallelAutoVacuumWorkItem *item, wakeup_condition wakeup_cond);
/********************************************************************
@@ -583,7 +664,14 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
* wakening conditions.
*/
- launcher_determine_sleep(av_worker_available(), false, &nap);
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ /* Take the smallest possible sleep interval. */
+ nap.tv_sec = 0;
+ nap.tv_usec = MIN_AUTOVAC_SLEEPTIME * 1000;
+ }
+ else
+ launcher_determine_sleep(av_worker_available(), false, &nap);
/*
* Wait until naptime expires or we get some type of signal (all the
@@ -614,6 +702,19 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
LWLockRelease(AutovacuumLock);
}
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ if (AutoVacuumShmem->av_signal[AutoVacParallelReq])
+ {
+ ParallelAutoVacuumWorkItem *item;
+
+ AutoVacuumShmem->av_signal[AutoVacParallelReq] = false;
+
+ item = &AutoVacuumShmem->pav_workItem;
+ nworkers_for_idx_autovac = item->nworkers_to_launch;
+ nworkers_launched = 0;
+ }
+ LWLockRelease(AutovacuumLock);
+
if (AutoVacuumShmem->av_signal[AutoVacForkFailed])
{
/*
@@ -686,6 +787,7 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
worker->wi_sharedrel = false;
worker->wi_proc = NULL;
worker->wi_launchtime = 0;
+ worker->wi_pcleanup = -1;
dclist_push_head(&AutoVacuumShmem->av_freeWorkers,
&worker->wi_links);
AutoVacuumShmem->av_startingWorker = NULL;
@@ -698,9 +800,29 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
}
LWLockRelease(AutovacuumLock); /* either shared or exclusive */
- /* if we can't do anything, just go back to sleep */
if (!can_launch)
+ {
+ /*
+ * If launcher cannot launch all requested for parallel index
+ * vacuum workers, it must handle all possible lock conflicts and
+ * tell everyone, that there will no new supportive workers.
+ */
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ ParallelAutoVacuumWorkItem *item;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+ Assert(item->active);
+
+ eliminate_lock_conflicts(item, false);
+ nworkers_launched = nworkers_for_idx_autovac = 0;
+ LWLockRelease(AutovacuumLock);
+ }
+
+ /* if we can't do anything else, just go back to sleep */
continue;
+ }
/* We're OK to start a new worker */
@@ -716,6 +838,15 @@ AutoVacLauncherMain(const void *startup_data, size_t startup_data_len)
*/
launch_worker(current_time);
}
+ else if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ /*
+ * One of active autovacuum workers sent us request to lauch
+ * participants for parallel index vacuum. We check this case first
+ * because we need to start participants as soon as possible.
+ */
+ launch_worker_for_pcleanup(current_time);
+ }
else
{
/*
@@ -1267,6 +1398,7 @@ do_start_worker(void)
worker->wi_dboid = avdb->adw_datid;
worker->wi_proc = NULL;
worker->wi_launchtime = GetCurrentTimestamp();
+ worker->wi_pcleanup = -1;
AutoVacuumShmem->av_startingWorker = worker;
@@ -1349,6 +1481,136 @@ launch_worker(TimestampTz now)
}
}
+/*
+ * launch_worker_for_pcleanup
+ *
+ * Wrapper for starting a worker (requested by leader of parallel index
+ * vacuuming) from the launcher.
+ */
+static void
+launch_worker_for_pcleanup(TimestampTz now)
+{
+ ParallelAutoVacuumWorkItem *item;
+ WorkerInfo worker;
+ dlist_node *wptr;
+
+ Assert(nworkers_launched < nworkers_for_idx_autovac);
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ /*
+ * Get a worker entry from the freelist. We checked above, so there
+ * really should be a free slot.
+ */
+ wptr = dclist_pop_head_node(&AutoVacuumShmem->av_freeWorkers);
+
+ worker = dlist_container(WorkerInfoData, wi_links, wptr);
+ worker->wi_dboid = InvalidOid;
+ worker->wi_proc = NULL;
+ worker->wi_launchtime = GetCurrentTimestamp();
+
+ /*
+ * Set indicator, that this workers must join to parallel index vacuum.
+ * This variable also plays the role of an unique id among parallel index
+ * vacuum workers. First id is '1', because '0' is reserved for leader.
+ */
+ worker->wi_pcleanup = (nworkers_launched + 1);
+
+ AutoVacuumShmem->av_startingWorker = worker;
+
+ SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
+
+ item = &AutoVacuumShmem->pav_workItem;
+ Assert(item->active);
+
+ nworkers_launched += 1;
+
+ if (nworkers_launched < nworkers_for_idx_autovac)
+ {
+ LWLockRelease(AutovacuumLock);
+ return;
+ }
+
+ Assert(item->sync_type == LAUNCHER &&
+ nworkers_launched == nworkers_for_idx_autovac);
+
+ /*
+ * If launcher managed to launch all requested for parallel index
+ * vacuum workers, it must handle all possible lock conflicts.
+ */
+ eliminate_lock_conflicts(item, true);
+ LWLockRelease(AutovacuumLock);
+}
+
+/*
+ * Must be called from autovacuum launcher when it launched all requested
+ * workers for parallel index vacuum, or when it realized, that no more
+ * processes can be launched.
+ *
+ * In this function launcher will assign roles in such a way as to avoid lock
+ * conflicts between leader and supportive workers.
+ *
+ * AutovacuumLock must be held in exclusive mode before calling this function!
+ */
+static void
+eliminate_lock_conflicts(ParallelAutoVacuumWorkItem *item, bool all_launched)
+{
+ Assert(AmAutoVacuumLauncherProcess());
+ Assert(LWLockHeldByMe(AutovacuumLock));
+
+ /* So, let's start... */
+
+ if (item->leader_sleeping_on_ssp &&
+ item->nworkers_sleeping == nworkers_launched)
+ {
+ /*
+ * If both leader and all launched supportive workers are sleeping, then
+ * only we can wake everyone up.
+ */
+ ConditionVariableBroadcast(&item->cv);
+
+ /* Advance status. */
+ item->status = START_SYNC_POINT_PASSED;
+ }
+ else if (item->leader_sleeping_on_ssp &&
+ item->nworkers_sleeping < nworkers_launched)
+ {
+ /*
+ * If leader already sleeping, but several supportive workers are
+ * initing, we shift the responsibility for awakening everyone into the
+ * worker who completes initialization last
+ */
+ item->sync_type = LAST_WORKER;
+ }
+ else if (!item->leader_sleeping_on_ssp &&
+ item->nworkers_sleeping == nworkers_launched)
+ {
+ /*
+ * If only leader is not sleeping - it must wake up all workers when it
+ * finishes all preparations.
+ */
+ item->sync_type = LEADER;
+ }
+ else
+ {
+ /*
+ * If nobody is sleeping, we assume that leader has higher chanses to
+ * asleep first, so set sync type to LAST_WORKER, but if the last worker
+ * will see that leader still not sleeping, it will change sync type to
+ * LEADER and asleep.
+ */
+ item->sync_type = LAST_WORKER;
+ }
+
+ /*
+ * If we cannot launch all requested workers, refresh
+ * nworkers_to_launch value, so that the last worker can find out
+ * that he is really the last.
+ */
+ if (!all_launched && item->sync_type == LAST_WORKER)
+ item->nworkers_to_launch = nworkers_launched;
+}
+
/*
* Called from postmaster to signal a failure to fork a process to become
* worker. The postmaster should kill(SIGUSR2) the launcher shortly
@@ -1360,6 +1622,37 @@ AutoVacWorkerFailed(void)
AutoVacuumShmem->av_signal[AutoVacForkFailed] = true;
}
+/*
+ * Called from autovacuum worker to signal that he needs participants in
+ * parallel index vacuum. Function sends SIGUSR2 to the launcher and returns
+ * 'true' iff signal was sent successfully.
+ */
+bool
+AutoVacParallelWorkRequest(void)
+{
+ if (AutoVacuumShmem->av_launcherpid == 0)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("autovacuum launcher is dead")));
+
+ return false;
+ }
+
+ if (kill(AutoVacuumShmem->av_launcherpid, SIGUSR2) < 0)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_SYSTEM_ERROR),
+ errmsg("failed to send signal to autovac launcher (pid %d): %m",
+ AutoVacuumShmem->av_launcherpid)));
+
+ return false;
+ }
+
+ AutoVacuumShmem->av_signal[AutoVacParallelReq] = true;
+ return true;
+}
+
/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */
static void
avl_sigusr2_handler(SIGNAL_ARGS)
@@ -1559,6 +1852,8 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len)
{
char dbname[NAMEDATALEN];
+ Assert(MyWorkerInfo->wi_pcleanup < 0);
+
/*
* Report autovac startup to the cumulative stats system. We
* deliberately do this before InitPostgres, so that the
@@ -1593,12 +1888,113 @@ AutoVacWorkerMain(const void *startup_data, size_t startup_data_len)
recentMulti = ReadNextMultiXactId();
do_autovacuum();
}
+ else if (AmParallelIdxAutoVacSupportive())
+ {
+ ParallelAutoVacuumWorkItem *item;
+ dsm_handle handle;
+ PGPROC *leader_proc;
+ int leader_proc_pid;
+ dsm_segment *seg;
+ shm_toc *toc;
+ char *asnapspace;
+ char *tsnapspace;
+ char dbname[NAMEDATALEN];
+ Snapshot tsnapshot;
+ Snapshot asnapshot;
+
+ /*
+ * We will abort parallel index vacuuming whithin current process if
+ * something errors out
+ */
+ PG_TRY();
+ {
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+ dbid = item->avw_database;
+ handle = item->handl;
+ leader_proc = item->leader_proc;
+ leader_proc_pid = item->leader_proc_pid;
+ LWLockRelease(AutovacuumLock);
+
+ InitPostgres(NULL, dbid, NULL, InvalidOid,
+ INIT_PG_OVERRIDE_ALLOW_CONNS,
+ dbname);
+
+ set_ps_display(dbname);
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ /* And do an appropriate amount of work */
+ recentXid = ReadNextTransactionId();
+ recentMulti = ReadNextMultiXactId();
+
+ if (parallel_autovacuum_start_sync_point(false) == -1)
+ {
+ /* We are not participating anymore */
+ MyWorkerInfo->wi_pcleanup = -1;
+ goto exit;
+ }
+
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(AV_PARALLEL_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("invalid magic number in dynamic shared memory segment")));
+
+ if (!BecomeLockGroupMember(leader_proc, leader_proc_pid))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not become lock group member")));
+ }
+
+ StartTransactionCommand();
+
+ asnapspace =
+ shm_toc_lookup(toc, AV_PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
+ tsnapspace =
+ shm_toc_lookup(toc, AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
+ asnapshot = RestoreSnapshot(asnapspace);
+ tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
+ RestoreTransactionSnapshot(tsnapshot, leader_proc);
+ PushActiveSnapshot(asnapshot);
+
+ /*
+ * We've changed which tuples we can see, and must therefore
+ * invalidate system caches.
+ */
+ InvalidateSystemCaches();
+
+ parallel_vacuum_main(seg, toc);
+
+ /* Must pop active snapshot so snapmgr.c doesn't complain. */
+ PopActiveSnapshot();
+
+ dsm_detach(seg);
+ CommitTransactionCommand();
+ ParallelAutovacuumEndSyncPoint(false);
+ }
+ PG_CATCH();
+ {
+ EmitErrorReport();
+ if (AmParallelIdxAutoVacSupportive())
+ handle_parallel_idx_autovac_errors();
+ }
+ PG_END_TRY();
+ }
/*
* The launcher will be notified of my death in ProcKill, *if* we managed
* to get a worker slot at all
*/
+exit:
/* All done, go away */
proc_exit(0);
}
@@ -2461,6 +2857,10 @@ do_autovacuum(void)
tab->at_datname, tab->at_nspname, tab->at_relname);
EmitErrorReport();
+ /* if we are parallel index vacuuming leader, we must shut it down */
+ if (AmParallelIdxAutoVacLeader())
+ handle_parallel_idx_autovac_errors();
+
/* this resets ProcGlobal->statusFlags[i] too */
AbortOutOfAnyTransaction();
FlushErrorState();
@@ -3296,6 +3696,503 @@ AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId,
return result;
}
+/*
+ * Release work item, used for managing parallel index vacuum. Must be called
+ * once and only from leader worker.
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ */
+void
+AutoVacuumReleaseParallelWork(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+
+ /*
+ * We might not get the workitem from launcher (we must not be considered
+ * as leader in this case), so just leave.
+ */
+ if (!AmParallelIdxAutoVacLeader())
+ return;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+
+ Assert(AmParallelIdxAutoVacLeader() &&
+ workitem->leader_proc_pid == MyProcPid);
+
+ workitem->leader_proc = NULL;
+ workitem->leader_proc_pid = 0;
+ workitem->active = false;
+
+ /* We are not leader anymore. */
+ MyWorkerInfo->wi_pcleanup = -1;
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+}
+
+static bool
+start_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item)
+{
+ bool need_wakeup = false;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ /*
+ * In normal case we should exit sleep loop after last launched
+ * supportive worker passed sync point (status == START_SYNC_POINT_PASSED).
+ * But if we are in SHUTDOWN mode, all launched workers will just exit
+ * sync point whithout status advancing. We can handle such case if we
+ * check that n_participating == n_to_launch.
+ */
+ if (item->status == SHUTDOWN)
+ need_wakeup = (item->nworkers_participating == item->nworkers_to_launch);
+ else
+ need_wakeup = item->status == START_SYNC_POINT_PASSED;
+ }
+ else
+ need_wakeup = (item->status == START_SYNC_POINT_PASSED ||
+ item->status == SHUTDOWN);
+
+ LWLockRelease(AutovacuumLock);
+ return need_wakeup;
+}
+
+static bool
+end_sync_point_wakeup_cond(ParallelAutoVacuumWorkItem *item)
+{
+ bool need_wakeup = false;
+
+ Assert(AmParallelIdxAutoVacLeader());
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ need_wakeup = item->status == END_SYNC_POINT_PASSED;
+ LWLockRelease(AutovacuumLock);
+ return need_wakeup;
+}
+
+/*
+ * Waiting on condition variable is frequent operation, so it has beed taken
+ * out with a separate function. Caller must acquire hold AutovacuumLock before
+ * calling it.
+ */
+static void
+CVSleep(ParallelAutoVacuumWorkItem *item, wakeup_condition wakeup_cond)
+{
+ ConditionVariablePrepareToSleep(&item->cv);
+
+ LWLockRelease(AutovacuumLock);
+ PG_TRY();
+ {
+ do
+ {
+ ConditionVariableSleep(&item->cv, PG_WAIT_IPC);
+ } while (!wakeup_cond(item));
+ }
+ PG_CATCH();
+ {
+ ConditionVariableCancelSleep();
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ ConditionVariableCancelSleep();
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+}
+
+/*
+ * This function used to synchronize leader with supportive workers during
+ * parallel index vacuuming. Each process will exit iff:
+ * Leader worker is ready to perform parallel vacuum &&
+ * All launched supportive workers are ready to perform parallel vacuum &&
+ * (Autovacuum launcher already launched all requested workers ||
+ * Autovacuum launcher cannot launch more workers)
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ *
+ * NOTE: Some workers may call this function when leader worker decided to shut
+ * down parallel vacuuming. In this case '-1' value will be returned.
+ */
+static int
+parallel_autovacuum_start_sync_point(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+ SyncType sync_type;
+ int num_participants;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+ Assert(workitem->active);
+ sync_type = workitem->sync_type;
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ Assert(workitem->leader_proc_pid == MyProcPid);
+
+ /* Wake up all sleeping supportive workers, if required ... */
+ if (sync_type == LEADER)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * Advance status, because we are guaranteed to pass this
+ * sync point.
+ * Don't advance if we call this function from error handle function
+ * (status == SHUTDOWN).
+ */
+ if (workitem->status != SHUTDOWN)
+ workitem->status = START_SYNC_POINT_PASSED;
+ }
+ /* ... otherwise, wait for somebody to wake us up */
+ else
+ {
+ workitem->leader_sleeping_on_ssp = true;
+ CVSleep(workitem, start_sync_point_wakeup_cond);
+ workitem->leader_sleeping_on_ssp = false;
+
+ /*
+ * A priori, we believe that in the end everyone should be awakened
+ * by the leader.
+ */
+ workitem->sync_type = LEADER;
+ }
+ }
+ else
+ {
+ workitem->nworkers_participating += 1;
+
+ /*
+ * If we know, that launcher will no longer attempt to launch more
+ * supportive workers for this item => we are LAST_WORKER for sure.
+ *
+ * Note, that launcher set LAST_WORKER sync type without knowing
+ * current status of leader. So we also check that leader is sleeping
+ * before wake all up. Otherwise, we must wait for leader (and ask him
+ * to wake all up).
+ */
+ if (workitem->nworkers_participating == workitem->nworkers_to_launch &&
+ sync_type == LAST_WORKER && workitem->leader_sleeping_on_ssp)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * We must not advance status if leader wants to shut down parallel
+ * execution (see checks below).
+ */
+ if (workitem->status != SHUTDOWN)
+ workitem->status = START_SYNC_POINT_PASSED;
+ }
+ else
+ {
+ if (workitem->nworkers_participating == workitem->nworkers_to_launch &&
+ sync_type == LAST_WORKER)
+ {
+ workitem->sync_type = LEADER;
+ }
+
+ workitem->nworkers_sleeping += 1;
+ CVSleep(workitem, start_sync_point_wakeup_cond);
+ workitem->nworkers_sleeping -= 1;
+ }
+ }
+
+ /* Tell caller that it must not participate in parallel index cleanup. */
+ if (workitem->status == SHUTDOWN)
+ num_participants = -1;
+ else
+ num_participants = workitem->nworkers_participating;
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return num_participants;
+}
+
+/*
+ * Like function above, but must be called by leader and supportive workers
+ * when they finished parallel index vacuum.
+ *
+ * If 'keep_lock' is true, then AutovacuumLock will not be released in the end
+ * of function execution.
+ */
+void
+ParallelAutovacuumEndSyncPoint(bool keep_lock)
+{
+ ParallelAutoVacuumWorkItem *workitem;
+
+ if (!LWLockHeldByMe(AutovacuumLock))
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+ Assert(workitem->active);
+
+ if (workitem->nworkers_participating == 0)
+ {
+ Assert(!AmParallelIdxAutoVacSupportive());
+
+ /*
+ * We have two cases when no supportive workers were launched:
+ * 1) Leader got workitem, but launcher didn't launch any
+ * workers => just advance status, because we don't need to wait
+ * for anybody.
+ * 2) Leader didn't get workitem, because it was already in use =>
+ * we must not touch it. Just leave.
+ */
+ if (AmParallelIdxAutoVacLeader())
+ {
+ Assert(workitem->leader_proc_pid == MyProcPid);
+ workitem->status = END_SYNC_POINT_PASSED;
+ }
+ else
+ Assert(workitem->leader_proc_pid != MyProcPid);
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return;
+ }
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ Assert(workitem->leader_proc_pid == MyProcPid);
+ Assert(workitem->sync_type == LEADER);
+
+ /* Wait for all workers to finish (only last worker will wake us up) */
+ if (workitem->nfinished != workitem->nworkers_participating)
+ {
+ workitem->sync_type = LAST_WORKER;
+ workitem->leader_sleeping_on_esp = true;
+ CVSleep(workitem, end_sync_point_wakeup_cond);
+ workitem->leader_sleeping_on_esp = false;
+
+ Assert(workitem->nfinished == workitem->nworkers_participating);
+
+ /*
+ * Advance status, because we are guaranteed to pass this
+ * sync point.
+ */
+ workitem->status = END_SYNC_POINT_PASSED;
+ }
+ }
+ else
+ {
+ workitem->nfinished += 1;
+
+ /* If we are last finished worker - wake up the leader.
+ *
+ * If not - just leave, because supportive worker already finished all
+ * work and must die.
+ */
+ if (workitem->sync_type == LAST_WORKER &&
+ workitem->nfinished == workitem->nworkers_participating &&
+ workitem->leader_sleeping_on_esp)
+ {
+ ConditionVariableBroadcast(&workitem->cv);
+
+ /*
+ * Don't need to check SHUTDOWN status here - all supportive workers
+ * are about to finish anyway.
+ */
+ workitem->status = END_SYNC_POINT_PASSED;
+ }
+
+ /* We are not participate anymore */
+ MyWorkerInfo->wi_pcleanup = -1;
+ }
+
+ if (!keep_lock)
+ LWLockRelease(AutovacuumLock);
+
+ return;
+}
+
+/*
+ * Get id of parallel index vacuum worker (counting from 0).
+ */
+int
+GetAutoVacuumParallelWorkerNumber(void)
+{
+ Assert(AmAutoVacuumWorkerProcess() && MyWorkerInfo->wi_pcleanup > 0);
+ return (MyWorkerInfo->wi_pcleanup - 1);
+}
+
+/*
+ * Leader autovacuum process can decide, that he needs several helper workers
+ * to process table in parallel mode. He must set up parallel context and call
+ * LaunchParallelAutovacuumWorkers.
+ *
+ * In this function we do following :
+ * 1) Send signal to autovacuum lancher that creates 'supportive workers'
+ * during launcher's standard work loop.
+ * 2) Wait for supportive workers to start.
+ *
+ * Funcition return number of workers that launcher was able to launch (may be
+ * less then 'nworkers_to_launch').
+ */
+int
+LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch,
+ dsm_handle handle)
+{
+ int nworkers_launched = 0;
+ ParallelAutoVacuumWorkItem *workitem;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+
+ workitem = &AutoVacuumShmem->pav_workItem;
+
+ /*
+ * For now, there can be only one leader across all cluster.
+ * TODO: fix it in next versions
+ */
+ if (workitem->active && workitem->leader_proc_pid != MyProcPid)
+ {
+ LWLockRelease(AutovacuumLock);
+ return -1;
+ }
+
+ /* Notify autovacuum launcher that we need supportive workers */
+ if (AutoVacParallelWorkRequest())
+ {
+ /* OK, we can use this workitem entry. Init it. */
+ workitem->avw_database = MyDatabaseId;
+ workitem->avw_relation = rel_id;
+ workitem->handl = handle;
+ workitem->leader_proc = MyProc;
+ workitem->leader_proc_pid = MyProcPid;
+ workitem->nworkers_participating = 0;
+ workitem->nworkers_to_launch = nworkers_to_launch;
+ workitem->leader_sleeping_on_ssp = false;
+ workitem->leader_sleeping_on_esp = false;
+ workitem->nworkers_sleeping = 0;
+ workitem->nfinished = 0;
+ workitem->sync_type = LAUNCHER;
+ workitem->status = STARTUP;
+
+ workitem->active = true;
+ LWLockRelease(AutovacuumLock);
+
+ /* Become the leader */
+ MyWorkerInfo->wi_pcleanup = 0;
+
+ /* All created workers must get same locks as leader process */
+ BecomeLockGroupLeader();
+
+ /*
+ * Wait until all supprotive workers are launched. Also retrieve actual
+ * number of participants
+ */
+
+ nworkers_launched = parallel_autovacuum_start_sync_point(false);
+ Assert(nworkers_launched >= 0);
+ }
+ else
+ {
+ /*
+ * If we (for any reason) cannot send signal to the launcher, don't try
+ * to do index vacuuming in parallel
+ */
+ LWLockRelease(AutovacuumLock);
+ return 0;
+ }
+
+ return nworkers_launched;
+}
+
+/*
+ * During parallel index vacuuming any worker (both supportives and leader) can
+ * catch an error.
+ * In order to handle it in the right way we must call this function.
+ */
+static void
+handle_parallel_idx_autovac_errors(void)
+{
+ ParallelAutoVacuumWorkItem *item;
+
+ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
+ item = &AutoVacuumShmem->pav_workItem;
+
+ if (AmParallelIdxAutoVacLeader())
+ {
+ if (item->status == START_SYNC_POINT_PASSED)
+ {
+ /*
+ * If start sync point already passed - just wait for all supportive
+ * workers to finish and exit.
+ */
+ ParallelAutovacuumEndSyncPoint(true);
+ }
+ else if (item->status == STARTUP)
+ {
+ /*
+ * If no sync point are passed we can prevent supportive workers
+ * from performing their work - set SHUTDOWN status and wait while
+ * all workers will see it.
+ */
+ item->status = SHUTDOWN;
+ parallel_autovacuum_start_sync_point(true);
+ }
+
+ AutoVacuumReleaseParallelWork(true);
+ }
+ else
+ {
+ Assert(AmParallelIdxAutoVacSupportive());
+
+ if (item->status == STARTUP || item->status == SHUTDOWN)
+ {
+ /*
+ * If no sync point are passed - just exclude ourselves from
+ * participants. Further parallel index vacuuming will take place
+ * as usual.
+ */
+ item->nworkers_to_launch -= 1;
+
+ if (item->nworkers_participating == item->nworkers_to_launch &&
+ item->sync_type == LAST_WORKER && item->leader_sleeping_on_ssp)
+ {
+ ConditionVariableBroadcast(&item->cv);
+
+ if (item->status != SHUTDOWN)
+ item->status = START_SYNC_POINT_PASSED;
+ }
+ }
+ else if (item->status == START_SYNC_POINT_PASSED)
+ {
+ /*
+ * If start sync point already passed we will simulate the usual
+ * end of work (see ParallelAutovacuumEndSyncPoint).
+ */
+ item->nfinished += 1;
+
+ /*
+ * We check "!item->leader_sleeping_on_ssp" in order to handle an
+ * almost impossible situation, when leader didn't have time to wake
+ * up after start sync point (but last worker already advenced
+ * status to START_SYNC_POINT_PASSED). In this case we should not
+ * advance status to END_SYNC_POINT_PASSED, so leader can continue
+ * processing.
+ */
+ if (item->sync_type == LAST_WORKER &&
+ item->nfinished == item->nworkers_participating &&
+ !item->leader_sleeping_on_ssp)
+ {
+ ConditionVariableBroadcast(&item->cv);
+ item->status = END_SYNC_POINT_PASSED;
+ }
+ }
+ }
+
+ LWLockRelease(AutovacuumLock);
+}
+
/*
* autovac_init
* This is called at postmaster initialization.
@@ -3361,6 +4258,9 @@ AutoVacuumShmemInit(void)
AutoVacuumShmem->av_startingWorker = NULL;
memset(AutoVacuumShmem->av_workItems, 0,
sizeof(AutoVacuumWorkItem) * NUM_WORKITEMS);
+ memset(&AutoVacuumShmem->pav_workItem, 0,
+ sizeof(ParallelAutoVacuumWorkItem));
+ ConditionVariableInit(&AutoVacuumShmem->pav_workItem.cv);
worker = (WorkerInfo) ((char *) AutoVacuumShmem +
MAXALIGN(sizeof(AutoVacuumShmemStruct)));
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 2f8cbd86759..2e36921097a 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3647,6 +3647,36 @@ struct config_int ConfigureNamesInt[] =
check_autovacuum_work_mem, NULL, NULL
},
+ {
+ {"max_parallel_index_autovac_workers", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the maximum number of parallel autovacuum worker processes during parallel index vacuuming of single table."),
+ NULL
+ },
+ &max_parallel_index_autovac_workers,
+ 0, 0, MAX_PARALLEL_WORKER_LIMIT,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"autovac_idx_parallel_min_rows", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the minimum number of dead tuples in single table that requires parallel index processing during autovacuum."),
+ NULL
+ },
+ &autovac_idx_parallel_min_rows,
+ 0, 0, INT32_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"autovac_idx_parallel_min_indexes", PGC_POSTMASTER, VACUUM_AUTOVACUUM,
+ gettext_noop("Sets the minimum number indexes created on single table that requires parallel index processing during autovacuum."),
+ NULL
+ },
+ &autovac_idx_parallel_min_indexes,
+ 2, 2, INT32_MAX,
+ NULL, NULL, NULL
+ },
+
{
{"tcp_keepalives_idle", PGC_USERSET, CONN_AUTH_TCP,
gettext_noop("Time between issuing TCP keepalives."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 34826d01380..08869398039 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -146,6 +146,12 @@
#hash_mem_multiplier = 2.0 # 1-1000.0 multiplier on hash table work_mem
#maintenance_work_mem = 64MB # min 64kB
#autovacuum_work_mem = -1 # min 64kB, or -1 to use maintenance_work_mem
+#max_parallel_index_autovac_workers = 0 # this feature disabled by default
+ # (change requires restart)
+#autovac_idx_parallel_min_rows = 0
+ # (change requires restart)
+#autovac_idx_parallel_min_indexes = 2
+ # (change requires restart)
#logical_decoding_work_mem = 64MB # min 64kB
#max_stack_depth = 2MB # min 100kB
#shared_memory_type = mmap # the default is the first option
diff --git a/src/include/postmaster/autovacuum.h b/src/include/postmaster/autovacuum.h
index e8135f41a1c..8647154437b 100644
--- a/src/include/postmaster/autovacuum.h
+++ b/src/include/postmaster/autovacuum.h
@@ -15,6 +15,8 @@
#define AUTOVACUUM_H
#include "storage/block.h"
+#include "storage/dsm_impl.h"
+#include "storage/lock.h"
/*
* Other processes can request specific work from autovacuum, identified by
@@ -25,12 +27,25 @@ typedef enum
AVW_BRINSummarizeRange,
} AutoVacuumWorkItemType;
+/*
+ * Magic number for parallel context TOC. Used for parallel index processing
+ * during autovacuum.
+ */
+#define AV_PARALLEL_MAGIC 0xaaaaaaaa
+
+/* Magic numbers for per-context parallel index processing state sharing. */
+#define AV_PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFF0000000000001)
+#define AV_PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFF0000000000002)
+
/* GUC variables */
extern PGDLLIMPORT bool autovacuum_start_daemon;
extern PGDLLIMPORT int autovacuum_worker_slots;
extern PGDLLIMPORT int autovacuum_max_workers;
extern PGDLLIMPORT int autovacuum_work_mem;
+extern PGDLLIMPORT int max_parallel_index_autovac_workers;
+extern PGDLLIMPORT int autovac_idx_parallel_min_rows;
+extern PGDLLIMPORT int autovac_idx_parallel_min_indexes;
extern PGDLLIMPORT int autovacuum_naptime;
extern PGDLLIMPORT int autovacuum_vac_thresh;
extern PGDLLIMPORT int autovacuum_vac_max_thresh;
@@ -60,10 +75,18 @@ extern void AutoVacWorkerFailed(void);
pg_noreturn extern void AutoVacLauncherMain(const void *startup_data, size_t startup_data_len);
pg_noreturn extern void AutoVacWorkerMain(const void *startup_data, size_t startup_data_len);
+/* called from autovac worker when it needs participants in parallel index cleanup */
+extern bool AutoVacParallelWorkRequest(void);
extern bool AutoVacuumRequestWork(AutoVacuumWorkItemType type,
Oid relationId, BlockNumber blkno);
+extern void AutoVacuumReleaseParallelWork(bool keep_lock);
+extern int AutoVacuumParallelWorkWaitForStart(void);
+extern void ParallelAutovacuumEndSyncPoint( bool keep_lock);
+extern int GetAutoVacuumParallelWorkerNumber(void);
+extern int LaunchParallelAutovacuumWorkers(Oid rel_id, int nworkers_to_launch,
+ dsm_handle handle);
/* shared memory stuff */
extern Size AutoVacuumShmemSize(void);
extern void AutoVacuumShmemInit(void);
diff --git a/src/test/modules/autovacuum/.gitignore b/src/test/modules/autovacuum/.gitignore
new file mode 100644
index 00000000000..0b54641bceb
--- /dev/null
+++ b/src/test/modules/autovacuum/.gitignore
@@ -0,0 +1 @@
+/tmp_check/
\ No newline at end of file
diff --git a/src/test/modules/autovacuum/Makefile b/src/test/modules/autovacuum/Makefile
new file mode 100644
index 00000000000..90c00ff350b
--- /dev/null
+++ b/src/test/modules/autovacuum/Makefile
@@ -0,0 +1,14 @@
+# src/test/modules/autovacuum/Makefile
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/autovacuum
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
\ No newline at end of file
diff --git a/src/test/modules/autovacuum/t/001_autovac_parallel.pl b/src/test/modules/autovacuum/t/001_autovac_parallel.pl
new file mode 100644
index 00000000000..ff07c33d867
--- /dev/null
+++ b/src/test/modules/autovacuum/t/001_autovac_parallel.pl
@@ -0,0 +1,137 @@
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $psql_out;
+
+my $node = PostgreSQL::Test::Cluster->new('node1');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+ autovacuum = off
+ max_wal_size = 4096
+});
+$node->start;
+
+my $indexes_num = 80;
+my $initial_rows_num = 1_000_000;
+
+# Create big table and create specified number of b-tree indexes on it
+$node->safe_psql('postgres', qq{
+ CREATE TABLE test_autovac (
+ id SERIAL PRIMARY KEY,
+ col_1 INTEGER, col_2 INTEGER, col_3 INTEGER, col_4 INTEGER, col_5 INTEGER,
+ col_6 INTEGER, col_7 INTEGER, col_8 INTEGER, col_9 INTEGER, col_10 INTEGER,
+ col_11 INTEGER, col_12 INTEGER, col_13 INTEGER, col_14 INTEGER, col_15 INTEGER,
+ col_16 INTEGER, col_17 INTEGER, col_18 INTEGER, col_19 INTEGER, col_20 INTEGER,
+ col_21 INTEGER, col_22 INTEGER, col_23 INTEGER, col_24 INTEGER, col_25 INTEGER,
+ col_26 INTEGER, col_27 INTEGER, col_28 INTEGER, col_29 INTEGER, col_30 INTEGER,
+ col_31 INTEGER, col_32 INTEGER, col_33 INTEGER, col_34 INTEGER, col_35 INTEGER,
+ col_36 INTEGER, col_37 INTEGER, col_38 INTEGER, col_39 INTEGER, col_40 INTEGER,
+ col_41 INTEGER, col_42 INTEGER, col_43 INTEGER, col_44 INTEGER, col_45 INTEGER,
+ col_46 INTEGER, col_47 INTEGER, col_48 INTEGER, col_49 INTEGER, col_50 INTEGER,
+ col_51 INTEGER, col_52 INTEGER, col_53 INTEGER, col_54 INTEGER, col_55 INTEGER,
+ col_56 INTEGER, col_57 INTEGER, col_58 INTEGER, col_59 INTEGER, col_60 INTEGER,
+ col_61 INTEGER, col_62 INTEGER, col_63 INTEGER, col_64 INTEGER, col_65 INTEGER,
+ col_66 INTEGER, col_67 INTEGER, col_68 INTEGER, col_69 INTEGER, col_70 INTEGER,
+ col_71 INTEGER, col_72 INTEGER, col_73 INTEGER, col_74 INTEGER, col_75 INTEGER,
+ col_76 INTEGER, col_77 INTEGER, col_78 INTEGER, col_79 INTEGER, col_80 INTEGER,
+ col_81 INTEGER, col_82 INTEGER, col_83 INTEGER, col_84 INTEGER, col_85 INTEGER,
+ col_86 INTEGER, col_87 INTEGER, col_88 INTEGER, col_89 INTEGER, col_90 INTEGER,
+ col_91 INTEGER, col_92 INTEGER, col_93 INTEGER, col_94 INTEGER, col_95 INTEGER,
+ col_96 INTEGER, col_97 INTEGER, col_98 INTEGER, col_99 INTEGER, col_100 INTEGER
+ );
+
+ DO \$\$
+ DECLARE
+ i INTEGER;
+ BEGIN
+ FOR i IN 1..$indexes_num LOOP
+ EXECUTE format('CREATE INDEX idx_col_\%s ON test_autovac (col_\%s);', i, i);
+ END LOOP;
+ END \$\$;
+});
+
+$node->psql('postgres',
+ "SELECT COUNT(*) FROM pg_index i
+ JOIN pg_class c ON c.oid = i.indrelid
+ WHERE c.relname = 'test_autovac';",
+ stdout => \$psql_out
+);
+is($psql_out, $indexes_num + 1, "All indexes created successfully");
+
+$node->safe_psql('postgres', qq{
+ DO \$\$
+ DECLARE
+ i INTEGER;
+ BEGIN
+ FOR i IN 1..$initial_rows_num LOOP
+ INSERT INTO test_autovac (
+ col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10,
+ col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20,
+ col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30,
+ col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40,
+ col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49, col_50,
+ col_51, col_52, col_53, col_54, col_55, col_56, col_57, col_58, col_59, col_60,
+ col_61, col_62, col_63, col_64, col_65, col_66, col_67, col_68, col_69, col_70,
+ col_71, col_72, col_73, col_74, col_75, col_76, col_77, col_78, col_79, col_80,
+ col_81, col_82, col_83, col_84, col_85, col_86, col_87, col_88, col_89, col_90,
+ col_91, col_92, col_93, col_94, col_95, col_96, col_97, col_98, col_99, col_100
+ ) VALUES (
+ i, i + 1, i + 2, i + 3, i + 4, i + 5, i + 6, i + 7, i + 8, i + 9,
+ i + 10, i + 11, i + 12, i + 13, i + 14, i + 15, i + 16, i + 17, i + 18, i + 19,
+ i + 20, i + 21, i + 22, i + 23, i + 24, i + 25, i + 26, i + 27, i + 28, i + 29,
+ i + 30, i + 31, i + 32, i + 33, i + 34, i + 35, i + 36, i + 37, i + 38, i + 39,
+ i + 40, i + 41, i + 42, i + 43, i + 44, i + 45, i + 46, i + 47, i + 48, i + 49,
+ i + 50, i + 51, i + 52, i + 53, i + 54, i + 55, i + 56, i + 57, i + 58, i + 59,
+ i + 60, i + 61, i + 62, i + 63, i + 64, i + 65, i + 66, i + 67, i + 68, i + 69,
+ i + 70, i + 71, i + 72, i + 73, i + 74, i + 75, i + 76, i + 77, i + 78, i + 79,
+ i + 80, i + 81, i + 82, i + 83, i + 84, i + 85, i + 86, i + 87, i + 88, i + 89,
+ i + 90, i + 91, i + 92, i + 93, i + 94, i + 95, i + 96, i + 97, i + 98, i + 99
+ );
+ END LOOP;
+ END \$\$;
+});
+
+$node->psql('postgres',
+ "SELECT COUNT(*) FROM test_autovac;",
+ stdout => \$psql_out
+);
+is($psql_out, $initial_rows_num, "All data inserted into table successfully");
+
+$node->safe_psql('postgres', qq{
+ UPDATE test_autovac SET col_1 = 0 WHERE (col_1 % 3) = 0;
+ ANALYZE test_autovac;
+});
+
+my $dead_tuples_thresh = $initial_rows_num / 4;
+my $indexes_num_thresh = $indexes_num / 2;
+my $num_workers = 3;
+
+# Reduce autovacuum_work_mem, so leader process will perform parallel indexi
+# vacuum phase several times
+$node->append_conf('postgresql.conf', qq{
+ autovacuum_naptime = '1s'
+ autovacuum_work_mem = 2048
+ autovacuum_vacuum_threshold = 1
+ autovacuum_analyze_threshold = 1
+ autovacuum_vacuum_scale_factor = 0.1
+ autovacuum_analyze_scale_factor = 0.1
+ autovacuum_max_workers = 10
+ autovacuum = on
+ autovac_idx_parallel_min_rows = $dead_tuples_thresh
+ autovac_idx_parallel_min_indexes = $indexes_num_thresh
+ max_parallel_index_autovac_workers = $num_workers
+});
+
+$node->restart;
+
+# wait for autovacuum to reset datfrozenxid age to 0
+$node->poll_query_until('postgres', q{
+ SELECT count(*) = 0 FROM pg_database WHERE mxid_age(datfrozenxid) > 0
+}) or die "Timed out while waiting for autovacuum";
+
+ok(1, "There are no segfaults");
+
+$node->stop;
+done_testing();
--
2.43.0
view thread (112+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: POC: Parallel processing of indexes in autovacuum
In-Reply-To: <CAJDiXgigcF3CMY86oREdQvxUDaUDFihkK9f78rdEyLTLeB0hdA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox