From 14abdef918a73e465900f758204de19982fc4224 Mon Sep 17 00:00:00 2001
From: Daniil Davidov <d.davydov@postgrespro.ru>
Date: Wed, 7 Jan 2026 16:03:20 +0700
Subject: [PATCH v18 5/5] Cost-based parameters propagation for parallel
 autovacuum

---
 src/backend/commands/vacuum.c                 |  26 +++-
 src/backend/commands/vacuumparallel.c         | 130 ++++++++++++++++++
 src/include/commands/vacuum.h                 |   2 +
 src/test/modules/test_autovacuum/Makefile     |   2 +
 .../modules/test_autovacuum/t/001_basic.pl    |  83 +++++++++++
 .../test_autovacuum/test_autovacuum--1.0.sql  |  12 ++
 .../modules/test_autovacuum/test_autovacuum.c |  75 ++++++++++
 7 files changed, 328 insertions(+), 2 deletions(-)

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index aa4fbec143f..4c40a36523a 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -2430,8 +2430,24 @@ vacuum_delay_point(bool is_analyze)
 	/* Always check for interrupts */
 	CHECK_FOR_INTERRUPTS();
 
-	if (InterruptPending ||
-		(!VacuumCostActive && !ConfigReloadPending))
+	if (InterruptPending)
+		return;
+
+	if (!AmAutoVacuumWorkerProcess())
+	{
+		/*
+		 * If we are autovacuum parallel worker, check whether cost-based
+		 * parameters had changed in leader worker.
+		 * If so, vacuum_cost_delay and vacuum_cost_limit will be set to the
+		 * values which leader worker is operating on.
+		 *
+		 * Do it before checking VacuumCostActive, because its value might be
+		 * changed after leader's parameters consumption.
+		 */
+		parallel_vacuum_fix_cost_based_params();
+	}
+
+	if (!VacuumCostActive && !ConfigReloadPending)
 		return;
 
 	/*
@@ -2445,6 +2461,12 @@ vacuum_delay_point(bool is_analyze)
 		ConfigReloadPending = false;
 		ProcessConfigFile(PGC_SIGHUP);
 		VacuumUpdateCosts();
+
+		/*
+		 * If we are parallel autovacuum leader and some of cost-based
+		 * parameters had changed, let other parallel workers know.
+		 */
+		parallel_vacuum_propagate_cost_based_params();
 	}
 
 	/*
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index c2f0a37eef2..06ecffeec42 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -54,6 +54,22 @@
 #define PARALLEL_VACUUM_KEY_WAL_USAGE		4
 #define PARALLEL_VACUUM_KEY_INDEX_STATS		5
 
+/*
+ * Only autovacuum leader can reload config file. We use this structure in
+ * parallel autovacuum for keeping worker's parameters in sync with leader's
+ * parameters.
+ */
+typedef struct PVSharedCostParams
+{
+	slock_t		spinlock; /* protects all fields below */
+
+	/* Copies of corresponding parameters from autovacuum leader process */
+	double	cost_delay;
+	int		cost_limit;
+}		PVSharedCostParams;
+
+static PVSharedCostParams *pv_shared_cost_params = NULL;
+
 /*
  * Shared information among parallel workers.  So this is allocated in the DSM
  * segment.
@@ -123,6 +139,18 @@ typedef struct PVShared
 
 	/* Statistics of shared dead items */
 	VacDeadItemsInfo dead_items_info;
+
+	/*
+	 * If 'true' then we are running parallel autovacuum. Otherwise, we are
+	 * running parallel maintenence VACUUM.
+	 */
+	bool	am_parallel_autovacuum;
+
+	/*
+	 * Struct for syncing parameters between supportive parallel autovacuum
+	 * workers with leader worker.
+	 */
+	PVSharedCostParams	cost_params;
 } PVShared;
 
 /* Status used during parallel index vacuum or cleanup */
@@ -396,6 +424,17 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
 	pg_atomic_init_u32(&(shared->idx), 0);
 
+	shared->am_parallel_autovacuum = AmAutoVacuumWorkerProcess();
+
+	if (shared->am_parallel_autovacuum)
+	{
+		shared->cost_params.cost_delay = vacuum_cost_delay;
+		shared->cost_params.cost_limit = vacuum_cost_limit;
+		SpinLockInit(&shared->cost_params.spinlock);
+
+		pv_shared_cost_params = &(shared->cost_params);
+	}
+
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	pvs->shared = shared;
 
@@ -538,6 +577,53 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
 	parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wusage);
 }
 
+/*
+ * Function to be called from parallel autovacuum worker in order to sync
+ * some cost-based delay parameter with the leader worker.
+ */
+bool
+parallel_vacuum_fix_cost_based_params(void)
+{
+	/* Check whether we are running parallel autovacuum */
+	if (pv_shared_cost_params == NULL)
+		return false;
+
+	Assert(IsParallelWorker() && !AmAutoVacuumWorkerProcess());
+
+	SpinLockAcquire(&pv_shared_cost_params->spinlock);
+
+	vacuum_cost_delay = pv_shared_cost_params->cost_delay;
+	vacuum_cost_limit = pv_shared_cost_params->cost_limit;
+
+	SpinLockRelease(&pv_shared_cost_params->spinlock);
+
+	if (vacuum_cost_delay > 0 && !VacuumFailsafeActive)
+		VacuumCostActive = true;
+
+	return true;
+}
+
+/*
+ * Function to be called from parallel autovacuum leader in order to propagate
+ * some cost-based parameters to the supportive workers.
+ */
+void
+parallel_vacuum_propagate_cost_based_params(void)
+{
+	/* Check whether we are running parallel autovacuum */
+	if (pv_shared_cost_params == NULL)
+		return;
+
+	Assert(AmAutoVacuumWorkerProcess());
+
+	SpinLockAcquire(&pv_shared_cost_params->spinlock);
+
+	pv_shared_cost_params->cost_delay = vacuum_cost_delay;
+	pv_shared_cost_params->cost_limit = vacuum_cost_limit;
+
+	SpinLockRelease(&pv_shared_cost_params->spinlock);
+}
+
 /*
  * Compute the number of parallel worker processes to request.  Both index
  * vacuum and index cleanup can be executed with parallel workers.
@@ -763,12 +849,26 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 	/* Vacuum the indexes that can be processed by only leader process */
 	parallel_vacuum_process_unsafe_indexes(pvs);
 
+	/*
+	 * To be able to exercise whether leader parallel autovacuum worker can
+	 * propagate cost-based params to parallel workers, wait here until
+	 * configuration is changed...
+	 */
+	INJECTION_POINT("av-leader-before-reload-conf", NULL);
+
 	/*
 	 * Join as a parallel worker.  The leader vacuums alone processes all
 	 * parallel-safe indexes in the case where no workers are launched.
 	 */
 	parallel_vacuum_process_safe_indexes(pvs);
 
+	/*
+	 * ...and then wait until leader guaranteed to propagate new parameters
+	 * values to the workers. I.e. tests are expecting, that during processing
+	 * of parallel safe index we have called vacuum_delay_point,
+	 */
+	INJECTION_POINT("av-leader-after-reload-conf", NULL);
+
 	/*
 	 * Next, accumulate buffer and WAL usage.  (This must wait for the workers
 	 * to finish, or we might get incomplete data.)
@@ -1104,6 +1204,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	VacuumSharedCostBalance = &(shared->cost_balance);
 	VacuumActiveNWorkers = &(shared->active_nworkers);
 
+	if (shared->am_parallel_autovacuum)
+		pv_shared_cost_params = &(shared->cost_params);
+
 	/* Set parallel vacuum state */
 	pvs.indrels = indrels;
 	pvs.nindexes = nindexes;
@@ -1131,6 +1234,33 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
 	/* Prepare to track buffer usage during parallel execution */
 	InstrStartParallelQuery();
 
+#ifdef USE_INJECTION_POINTS
+	if (shared->am_parallel_autovacuum)
+	{
+		Assert(VacuumActiveNWorkers != NULL);
+
+		/*
+		 * To be able to exercise whether leader parallel autovacuum worker can
+		 * propagate cost-based params to parallel workers, wait here until
+		 * configuration is changed and leader workers had updated shared state.
+		 */
+		INJECTION_POINT("av-worker-before-reload-conf", NULL);
+
+		/* Simulate config reload during normal processing */
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+		vacuum_delay_point(false);
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+
+		/*
+		 * Wait until worker guaranteed to consume new parameters values from
+		 * the leader and save new value in injection point state.
+		 */
+		INJECTION_POINT("autovacuum-set-cost-based-parameter",
+						&vacuum_cost_delay);
+		INJECTION_POINT("av-worker-after-reload-conf", NULL);
+	}
+#endif
+
 	/* Process indexes to perform vacuum/cleanup */
 	parallel_vacuum_process_safe_indexes(&pvs);
 
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index ec5d70aacdc..73125439bed 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -411,6 +411,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs,
 												int num_index_scans,
 												bool estimated_count,
 												PVWorkersUsage *wusage);
+extern bool parallel_vacuum_fix_cost_based_params(void);
+extern void parallel_vacuum_propagate_cost_based_params(void);
 extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
 
 /* in commands/analyze.c */
diff --git a/src/test/modules/test_autovacuum/Makefile b/src/test/modules/test_autovacuum/Makefile
index 4cf7344b2ac..32254c53a5d 100644
--- a/src/test/modules/test_autovacuum/Makefile
+++ b/src/test/modules/test_autovacuum/Makefile
@@ -12,6 +12,8 @@ DATA = test_autovacuum--1.0.sql
 
 TAP_TESTS = 1
 
+EXTRA_INSTALL = src/test/modules/injection_points
+
 export enable_injection_points
 
 ifdef USE_PGXS
diff --git a/src/test/modules/test_autovacuum/t/001_basic.pl b/src/test/modules/test_autovacuum/t/001_basic.pl
index 8bf153d132c..eec0f41b6a6 100644
--- a/src/test/modules/test_autovacuum/t/001_basic.pl
+++ b/src/test/modules/test_autovacuum/t/001_basic.pl
@@ -28,6 +28,11 @@ $node->append_conf('postgresql.conf', qq{
 });
 $node->start;
 
+if (!$node->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
 my $indexes_num = 4;
 my $initial_rows_num = 10_000;
 my $autovacuum_parallel_workers = 2;
@@ -73,6 +78,9 @@ $node->safe_psql('postgres', qq{
 	CREATE EXTENSION test_autovacuum;
 	SELECT inj_set_free_workers_attach();
 	SELECT inj_leader_failure_attach();
+	SELECT inj_check_av_param_attach();
+
+	CREATE EXTENSION injection_points;
 });
 
 # Test 1 :
@@ -166,5 +174,80 @@ $node->safe_psql('postgres', qq{
 	SELECT inj_leader_failure_detach();
 });
 
+# Test 4:
+# Check whether parallel autovacuum leader can propagate cost-based parameters
+# to parallel workers.
+
+# Disable autovacuum on table during preparation for the next test
+$node->safe_psql('postgres', qq{
+	ALTER TABLE test_autovac SET (autovacuum_enabled = false);
+});
+
+# Create more dead tuples
+$node->safe_psql('postgres', qq{
+	UPDATE test_autovac SET col_3 = 0 WHERE (col_4 % 3) = 0;
+	ANALYZE test_autovac;
+});
+
+$node->safe_psql('postgres', qq{
+	SELECT injection_points_attach('av-leader-before-reload-conf', 'wait');
+	SELECT injection_points_attach('av-leader-after-reload-conf', 'wait');
+	SELECT injection_points_attach('av-worker-before-reload-conf', 'wait');
+	SELECT injection_points_attach('av-worker-after-reload-conf', 'wait');
+});
+
+$node->safe_psql('postgres', qq{
+	ALTER TABLE test_autovac SET (autovacuum_enabled = true);
+});
+
+# Wait until leader parallel worker get to the point before vacuum_delay_point
+# and change cost-based config parameter.
+
+$node->wait_for_event('autovacuum worker', 'av-leader-before-reload-conf');
+$node->psql('postgres', qq{
+	ALTER SYSTEM SET autovacuum_vacuum_cost_delay = 10;
+	SELECT pg_reload_conf();
+});
+$node->psql('postgres', qq{
+	SELECT injection_points_wakeup('av-leader-before-reload-conf');
+});
+
+# Wait until leader worker propagates new patameter's value to the other
+# workers and let them to call vacuum_delay_point
+
+$node->wait_for_event('autovacuum worker', 'av-leader-after-reload-conf');
+$node->safe_psql('postgres', qq{
+	SELECT injection_points_wakeup('av-leader-after-reload-conf');
+	SELECT injection_points_wakeup('av-worker-before-reload-conf');
+});
+
+# Check whether parallel worker has consume new parameter's value from the
+# leader.
+# Aactually, it can happen before worker gets to the injection point, but we
+# want to make everything as deterministic as possible.
+
+$node->wait_for_event('parallel worker', 'av-worker-after-reload-conf');
+$node->psql('postgres',
+	"SELECT get_parallel_autovacuum_worker_param_value('vacuum_cost_delay');",
+	stdout => \$psql_out,
+);
+is($psql_out, 10.0, 'Leader successfully propagated parameter value');
+
+$node->safe_psql('postgres', qq{
+	SELECT injection_points_wakeup('av-worker-after-reload-conf');
+});
+
+# Cleanup
+$node->safe_psql('postgres', qq{
+	SELECT injection_points_detach('av-leader-before-reload-conf');
+	SELECT injection_points_detach('av-leader-after-reload-conf');
+	SELECT injection_points_detach('av-worker-before-reload-conf');
+	SELECT injection_points_detach('av-worker-after-reload-conf');
+	SELECT inj_check_av_param_detach();
+
+	DROP EXTENSION test_autovacuum;
+	DROP EXTENSION injection_points;
+});
+
 $node->stop;
 done_testing();
diff --git a/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql b/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql
index 017d5da85ea..cb0407952d7 100644
--- a/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql
+++ b/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql
@@ -14,6 +14,10 @@ CREATE FUNCTION trigger_leader_failure(failure_type text)
 RETURNS VOID STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION get_parallel_autovacuum_worker_param_value(param_name text)
+RETURNS FLOAT8 STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 /*
  * Injection point related functions
  */
@@ -32,3 +36,11 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 CREATE FUNCTION inj_leader_failure_detach()
 RETURNS VOID STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_check_av_param_attach()
+RETURNS VOID STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_check_av_param_detach()
+RETURNS VOID STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_autovacuum/test_autovacuum.c b/src/test/modules/test_autovacuum/test_autovacuum.c
index 7948f4858ae..e96cfda7ae9 100644
--- a/src/test/modules/test_autovacuum/test_autovacuum.c
+++ b/src/test/modules/test_autovacuum/test_autovacuum.c
@@ -38,6 +38,9 @@ typedef struct InjPointState
 
 	bool		enabled_leader_failure;
 	AVLeaderFaulureType ftype;
+
+	bool		enabled_check_av_param;
+	double		vacuum_cost_delay;
 }			InjPointState;
 
 static InjPointState * inj_point_state;
@@ -92,6 +95,12 @@ test_autovacuum_shmem_startup(void)
 							 "inj_trigger_leader_failure",
 							 NULL,
 							 0);
+
+		InjectionPointAttach("autovacuum-set-cost-based-parameter",
+							 "test_autovacuum",
+							 "inj_set_av_parameter",
+							 NULL,
+							 0);
 	}
 
 	LWLockRelease(AddinShmemInitLock);
@@ -109,6 +118,9 @@ _PG_init(void)
 	shmem_startup_hook = test_autovacuum_shmem_startup;
 }
 
+extern PGDLLEXPORT void inj_set_av_parameter(const char *name,
+											 const void *private_data,
+											 void *arg);
 extern PGDLLEXPORT void inj_set_free_workers(const char *name,
 											 const void *private_data,
 											 void *arg);
@@ -205,6 +217,45 @@ trigger_leader_failure(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * Set current setting of "vacuum_cost_delay" parameter.
+ *
+ * Function is called from parallel autovacuum worker.
+ */
+void
+inj_set_av_parameter(const char *name, const void *private_data, void *arg)
+{
+	ereport(LOG,
+			errmsg("set autovacuum parameter injection point called"),
+			errhidestmt(true), errhidecontext(true));
+
+	if (inj_point_state->enabled_check_av_param)
+	{
+		Assert(arg != NULL);
+		inj_point_state->vacuum_cost_delay = *(double *) arg;
+	}
+}
+
+PG_FUNCTION_INFO_V1(get_parallel_autovacuum_worker_param_value);
+Datum
+get_parallel_autovacuum_worker_param_value(PG_FUNCTION_ARGS)
+{
+	const char *param_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	double		value = 0.0;
+
+#ifndef USE_INJECTION_POINTS
+	elog(ERROR, "injection points not supported");
+#endif
+
+	if (strcmp(param_name, "vacuum_cost_delay") == 0)
+		value = inj_point_state->vacuum_cost_delay;
+	else
+		elog(ERROR,
+			 "cannot retrieve parameter %s from injection point", param_name);
+
+	PG_RETURN_FLOAT8((float8) value);
+}
+
 PG_FUNCTION_INFO_V1(inj_set_free_workers_attach);
 Datum
 inj_set_free_workers_attach(PG_FUNCTION_ARGS)
@@ -253,3 +304,27 @@ inj_leader_failure_detach(PG_FUNCTION_ARGS)
 #endif
 	PG_RETURN_VOID();
 }
+
+PG_FUNCTION_INFO_V1(inj_check_av_param_attach);
+Datum
+inj_check_av_param_attach(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_point_state->enabled_check_av_param = true;
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_check_av_param_detach);
+Datum
+inj_check_av_param_detach(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_point_state->enabled_check_av_param = false;
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+	PG_RETURN_VOID();
+}
-- 
2.43.0

