public inbox for [email protected]
help / color / mirror / Atom feedFrom: Thomas Munro <[email protected]>
To: Dmitry Dolgov <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Subject: Re: Automatically sizing the IO worker pool
Date: Sat, 12 Jul 2025 17:08:29 +1200
Message-ID: <CA+hUKGJLe-0E-wZ-is78CEHhjbC=ihMVCQLoN1dmD-j05s9qRg@mail.gmail.com> (raw)
In-Reply-To: <CA+q6zcW0qh2aPKg0z58mp-Ba8avp7MWkS00ADrOOv=CBzJJMLA@mail.gmail.com>
References: <CA+hUKG+m4xV0LMoH2c=oRAdEXuCnh+tGBTWa7uFeFMGgTLAw+Q@mail.gmail.com>
<it7fexpclowjku57bsdh4uqr366wa2fxtq5ahzxczoxonmbh5s@g2f5oesiakzq>
<CA+hUKGJULUTkT2LpeHTSt3KHbJrYNBT-kj1-OhMRV_PnUQ_57A@mail.gmail.com>
<CA+q6zcW0qh2aPKg0z58mp-Ba8avp7MWkS00ADrOOv=CBzJJMLA@mail.gmail.com>
On Wed, May 28, 2025 at 5:55 AM Dmitry Dolgov <[email protected]> wrote:
> I probably had to start with a statement that I find the current approach reasonable, and I'm only curious if there is more to get out of it. I haven't benchmarked the patch yet (plan getting to it when I'll get back), and can imagine practical considerations significantly impacting any potential solution.
Here's a rebase.
Attachments:
[text/x-patch] v2-0001-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patch (1.9K, 2-v2-0001-aio-Try-repeatedly-to-give-batched-IOs-to-workers.patch)
download | inline diff:
From fa7aac1bc9c0a47fbdbd9459424f08fa61b71ce2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Fri, 11 Apr 2025 21:17:26 +1200
Subject: [PATCH v2 1/2] aio: Try repeatedly to give batched IOs to workers.
Previously, when the submission queue was full we'd run all remaining
IOs in a batched submissoin synchronously. Andres rightly pointed out
that we should really try again between synchronous IOs, since the
workers might have made progress in draining the queue.
Suggested-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
src/backend/storage/aio/method_worker.c | 30 ++++++++++++++++++++++---
1 file changed, 27 insertions(+), 3 deletions(-)
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index bf8f77e6ff6..9a82d5f847d 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -282,12 +282,36 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
SetLatch(wakeup);
/* Run whatever is left synchronously. */
- if (nsync > 0)
+ for (int i = 0; i < nsync; ++i)
{
- for (int i = 0; i < nsync; ++i)
+ wakeup = NULL;
+
+ /*
+ * Between synchronous IO operations, try again to enqueue as many as
+ * we can.
+ */
+ if (i > 0)
{
- pgaio_io_perform_synchronously(synchronous_ios[i]);
+ wakeup = NULL;
+
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ while (i < nsync &&
+ pgaio_worker_submission_queue_insert(synchronous_ios[i]))
+ {
+ if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+ i++;
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ if (wakeup)
+ SetLatch(wakeup);
+
+ if (i == nsync)
+ break;
}
+
+ pgaio_io_perform_synchronously(synchronous_ios[i]);
}
}
--
2.47.2
[text/x-patch] v2-0002-aio-Adjust-IO-worker-pool-size-automatically.patch (33.9K, 3-v2-0002-aio-Adjust-IO-worker-pool-size-automatically.patch)
download | inline diff:
From a0a5fff1f1d21c002bf68d36de9aff21bdf61783 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 22 Mar 2025 00:36:49 +1300
Subject: [PATCH v2 2/2] aio: Adjust IO worker pool size automatically.
Replace the simple io_workers setting with:
io_min_workers=1
io_max_workers=8
io_worker_idle_timeout=60s
io_worker_launch_interval=500ms
The pool is automatically sized within the configured range according
to demand.
Discussion: https://postgr.es/m/CA%2BhUKG%2Bm4xV0LMoH2c%3DoRAdEXuCnh%2BtGBTWa7uFeFMGgTLAw%2BQ%40mail.gmail.com
---
doc/src/sgml/config.sgml | 70 ++-
src/backend/postmaster/postmaster.c | 64 ++-
src/backend/storage/aio/method_worker.c | 445 ++++++++++++++----
.../utils/activity/wait_event_names.txt | 1 +
src/backend/utils/misc/guc_tables.c | 46 +-
src/backend/utils/misc/postgresql.conf.sample | 5 +-
src/include/storage/io_worker.h | 9 +-
src/include/storage/lwlocklist.h | 1 +
src/include/storage/pmsignal.h | 1 +
src/test/modules/test_aio/t/002_io_workers.pl | 15 +-
10 files changed, 535 insertions(+), 122 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c7acc0f182f..98532e55041 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2787,16 +2787,76 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
- <varlistentry id="guc-io-workers" xreflabel="io_workers">
- <term><varname>io_workers</varname> (<type>integer</type>)
+ <varlistentry id="guc-io-min-workers" xreflabel="io_min_workers">
+ <term><varname>io_min_workers</varname> (<type>integer</type>)
<indexterm>
- <primary><varname>io_workers</varname> configuration parameter</primary>
+ <primary><varname>io_min_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
- Selects the number of I/O worker processes to use. The default is
- 3. This parameter can only be set in the
+ Sets the minimum number of I/O worker processes to use. The default is
+ 1. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-max-workers" xreflabel="io_max_workers">
+ <term><varname>io_max_workers</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_max_workers</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum number of I/O worker processes to use. The default is
+ 8. This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-idle-timeout" xreflabel="io_worker_idle_timeout">
+ <term><varname>io_worker_idle_timeout</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_idle_timeout</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the time after which idle I/O worker processes will exit, reducing the
+ maximum size of the I/O worker pool towards the minimum. The default
+ is 1 minute.
+ This parameter can only be set in the
+ <filename>postgresql.conf</filename> file or on the server command
+ line.
+ </para>
+ <para>
+ Only has an effect if <xref linkend="guc-io-method"/> is set to
+ <literal>worker</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry id="guc-io-worker-launch-interval" xreflabel="io_worker_launch_interval">
+ <term><varname>io_worker_launch_interval</varname> (<type>int</type>)
+ <indexterm>
+ <primary><varname>io_worker_launch_interval</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the minimum time between launching new I/O workers. This can be used to avoid
+ sudden bursts of new I/O workers. The default is 100ms.
+ This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index cca9b946e53..a5438fa079d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -408,6 +408,7 @@ static DNSServiceRef bonjour_sdref = NULL;
#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_delay_until = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
@@ -1569,6 +1570,15 @@ DetermineSleepTime(void)
if (StartWorkerNeeded)
return 0;
+ /* If we need a new IO worker, defer until launch delay expires. */
+ if (pgaio_worker_test_new_worker_needed() &&
+ io_worker_count < io_max_workers)
+ {
+ if (io_worker_launch_delay_until == 0)
+ return 0;
+ next_wakeup = io_worker_launch_delay_until;
+ }
+
if (HaveCrashedWorker)
{
dlist_mutable_iter iter;
@@ -3750,6 +3760,15 @@ process_pm_pmsignal(void)
StartWorkerNeeded = true;
}
+ /* Process IO worker start requets. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_adjust_io_workers() will run.
+ */
+ }
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
@@ -4355,8 +4374,9 @@ maybe_reap_io_worker(int pid)
/*
* Start or stop IO workers, to close the gap between the number of running
* workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
+ * the io_{min,max}_workers GUCs (by increasing and decreasing the number of
+ * workers) and requests to start a new one due to submission queue backlog,
+ * as well as workers terminating in response to errors (by starting
* "replacement" workers).
*/
static void
@@ -4385,8 +4405,16 @@ maybe_adjust_io_workers(void)
Assert(pmState < PM_WAIT_IO_WORKERS);
- /* Not enough running? */
- while (io_worker_count < io_workers)
+ /* Cancel the launch delay when it expires to minimize clock access. */
+ if (io_worker_launch_delay_until != 0 &&
+ io_worker_launch_delay_until <= GetCurrentTimestamp())
+ io_worker_launch_delay_until = 0;
+
+ /* Not enough workers running? */
+ while (io_worker_launch_delay_until == 0 &&
+ io_worker_count < io_max_workers &&
+ ((io_worker_count < io_min_workers ||
+ pgaio_worker_clear_new_worker_needed())))
{
PMChild *child;
int i;
@@ -4400,6 +4428,16 @@ maybe_adjust_io_workers(void)
if (i == MAX_IO_WORKERS)
elog(ERROR, "could not find a free IO worker slot");
+ /*
+ * Apply launch delay even for failures to avoid retrying too fast on
+ * fork() failure, but not while we're still building the minimum pool
+ * size.
+ */
+ if (io_worker_count >= io_min_workers)
+ io_worker_launch_delay_until =
+ TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ io_worker_launch_interval);
+
/* Try to launch one. */
child = StartChildProcess(B_IO_WORKER);
if (child != NULL)
@@ -4411,19 +4449,11 @@ maybe_adjust_io_workers(void)
break; /* try again next time */
}
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
- {
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
- }
- }
+ /*
+ * If there are too many running because io_max_workers changed, that will
+ * be handled by the IO workers themselves so they can shut down in
+ * preferred order.
+ */
}
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 9a82d5f847d..6d3f5289e18 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -11,9 +11,10 @@
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * When a batch of IOs is submitted, the lowest numbered idle worker is woken
+ * up. If it sees more work in the queue it wakes a peer to help, and so on
+ * in a chain. When a backlog is detected, the pool size is increased. When
+ * the highest numbered worker times out after a period of inactivity.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
@@ -40,6 +41,8 @@
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
@@ -47,10 +50,8 @@
#include "utils/ps_status.h"
#include "utils/wait_event.h"
-
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Saturation for stats counters used to estimate wakeup:work ratio. */
+#define PGAIO_WORKER_STATS_MAX 64
typedef struct PgAioWorkerSubmissionQueue
{
@@ -63,17 +64,25 @@ typedef struct PgAioWorkerSubmissionQueue
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
typedef struct PgAioWorkerControl
{
+ /* Seen by postmaster */
+ volatile bool new_worker_needed;
+
+ /* Potected by AioWorkerSubmissionQueueLock. */
uint64 idle_worker_mask;
+
+ /* Protected by AioWorkerControlLock. */
+ uint64 worker_set;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
-
static size_t pgaio_worker_shmem_size(void);
static void pgaio_worker_shmem_init(bool first_time);
@@ -91,11 +100,14 @@ const IoMethodOps pgaio_worker_ops = {
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 1;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 500;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
@@ -152,37 +164,172 @@ pgaio_worker_shmem_init(bool first_time)
&found);
if (!found)
{
+ io_worker_control->new_worker_needed = false;
+ io_worker_control->worker_set = 0;
io_worker_control->idle_worker_mask = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
- }
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
}
}
+static void
+pgaio_worker_consider_new_worker(uint32 queue_depth)
+{
+ /*
+ * This is called from sites that don't hold AioWorkerControlLock, but it
+ * changes infrequently and an up to date value is not required for this
+ * heuristic purpose.
+ */
+ if (!io_worker_control->new_worker_needed &&
+ queue_depth >= io_worker_control->nworkers)
+ {
+ io_worker_control->new_worker_needed = true;
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_CHANGE);
+ }
+}
+
+/*
+ * Called by a worker when the queue is empty, to try to prevent a delayed
+ * reaction to a brief burst. This races against the postmaster acting on the
+ * old value if it was recently set to true, but that's OK, the ordering would
+ * be indeterminate anyway even if we could use locks in the postmaster.
+ */
+static void
+pgaio_worker_cancel_new_worker(void)
+{
+ io_worker_control->new_worker_needed = false;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed.
+ */
+bool
+pgaio_worker_test_new_worker_needed(void)
+{
+ return io_worker_control->new_worker_needed;
+}
+
+/*
+ * Called by the postmaster to check if a new worker is needed when it's ready
+ * to launch one, and clear the flag.
+ */
+bool
+pgaio_worker_clear_new_worker_needed(void)
+{
+ bool result;
+
+ result = io_worker_control->new_worker_needed;
+ if (result)
+ io_worker_control->new_worker_needed = false;
+
+ return result;
+}
+
+static uint64
+pgaio_worker_mask(int worker)
+{
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_worker_add(uint64 *set, int worker)
+{
+ *set |= pgaio_worker_mask(worker);
+}
+
+static void
+pgaio_worker_remove(uint64 *set, int worker)
+{
+ *set &= ~pgaio_worker_mask(worker);
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_worker_in(uint64 set, int worker)
+{
+ return (set & pgaio_worker_mask(worker)) != 0;
+}
+#endif
+
+static uint64
+pgaio_worker_highest(uint64 set)
+{
+ return pg_leftmost_one_pos64(set);
+}
+
+static uint64
+pgaio_worker_lowest(uint64 set)
+{
+ return pg_rightmost_one_pos64(set);
+}
+
+static int
+pgaio_worker_pop(uint64 *set)
+{
+ int worker;
+
+ Assert(set != 0);
+ worker = pgaio_worker_lowest(*set);
+ pgaio_worker_remove(set, worker);
+ return worker;
+}
+
static int
pgaio_worker_choose_idle(void)
{
+ uint64 idle_worker_mask;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ /*
+ * Workers only wake higher numbered workers, to try to encourage an
+ * ordering of wakeup:work ratios, reducing spurious wakeups in lower
+ * numbered workers.
+ */
+ idle_worker_mask = io_worker_control->idle_worker_mask;
+ if (MyIoWorkerId != -1)
+ idle_worker_mask &= ~(pgaio_worker_mask(MyIoWorkerId) - 1);
+
+ if (idle_worker_mask == 0)
return -1;
/* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
- Assert(io_worker_control->workers[worker].in_use);
+ worker = pgaio_worker_lowest(idle_worker_mask);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_worker_mask. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
@@ -204,6 +351,8 @@ pgaio_worker_submission_queue_consume(void)
PgAioWorkerSubmissionQueue *queue;
uint32 result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
@@ -220,6 +369,8 @@ pgaio_worker_submission_queue_depth(void)
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
@@ -244,9 +395,9 @@ static void
pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+ uint32 queue_depth;
+ int worker = -1;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
@@ -261,51 +412,48 @@ pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
* we can to workers, to maximize concurrency.
*/
synchronous_ios[nsync++] = staged_ios[i];
- continue;
}
-
- if (wakeup == NULL)
+ else if (worker == -1)
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
}
+ queue_depth = pgaio_worker_submission_queue_depth();
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else
+ pgaio_worker_consider_new_worker(queue_depth);
/* Run whatever is left synchronously. */
for (int i = 0; i < nsync; ++i)
{
- wakeup = NULL;
-
/*
* Between synchronous IO operations, try again to enqueue as many as
* we can.
*/
if (i > 0)
{
- wakeup = NULL;
+ worker = -1;
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
while (i < nsync &&
pgaio_worker_submission_queue_insert(synchronous_ios[i]))
{
- if (wakeup == NULL && (worker = pgaio_worker_choose_idle()) >= 0)
- wakeup = io_worker_control->workers[worker].latch;
+ if (worker == -1)
+ worker = pgaio_worker_choose_idle();
i++;
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- if (wakeup)
- SetLatch(wakeup);
+ if (worker != -1)
+ pgaio_worker_wake(worker);
if (i == nsync)
break;
@@ -337,14 +485,27 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ uint64 notify_set;
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_worker_in(io_worker_control->worker_set, MyIoWorkerId));
+ pgaio_worker_remove(&io_worker_control->worker_set, MyIoWorkerId);
+ notify_set = io_worker_control->worker_set;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (notify_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(¬ify_set));
}
/*
@@ -354,33 +515,37 @@ pgaio_worker_die(int code, Datum arg)
static void
pgaio_worker_register(void)
{
- MyIoWorkerId = -1;
+ uint64 worker_set_inverted;
+ uint64 old_worker_set;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ MyIoWorkerId = -1;
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ worker_set_inverted = ~io_worker_control->worker_set;
+ if (worker_set_inverted != 0)
{
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
+ MyIoWorkerId = pgaio_worker_lowest(worker_set_inverted);
+ if (MyIoWorkerId >= MAX_IO_WORKERS)
+ MyIoWorkerId = -1;
}
-
if (MyIoWorkerId == -1)
elog(ERROR, "couldn't find a free worker slot");
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
+
+ old_worker_set = io_worker_control->worker_set;
+ Assert(!pgaio_worker_in(old_worker_set, MyIoWorkerId));
+ pgaio_worker_add(&io_worker_control->worker_set, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(pg_popcount64(io_worker_control->worker_set) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /* Notify other workers on pool change. */
+ while (old_worker_set != 0)
+ pgaio_worker_wake(pgaio_worker_pop(&old_worker_set));
on_shmem_exit(pgaio_worker_die, 0);
}
@@ -406,14 +571,47 @@ pgaio_worker_error_callback(void *arg)
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ uint64 worker_set;
+
+ /* Serialize against pool sized changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ worker_set = io_worker_control->worker_set;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_worker_highest(worker_set))
+ return false;
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int ios = 0;
+ int wakeups = 0;
MyBackendType = B_IO_WORKER;
AuxiliaryProcessMainCommon();
@@ -482,10 +680,8 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ uint32 queue_depth;
+ int worker = -1;
/*
* Try to get a job to do.
@@ -494,40 +690,48 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
* to ensure that we don't see an outdated data in the handle.
*/
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+ io_index = pgaio_worker_submission_queue_consume();
+ queue_depth = pgaio_worker_submission_queue_depth();
+ if (io_index == UINT32_MAX)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_worker_add(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_worker_remove(&io_worker_control->idle_worker_mask,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
- {
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
- }
+ /*
+ * See if we should wake up a peer. Only do this if this worker
+ * is not experiencing spurious wakeups itself, to end a chain of
+ * wasted scheduling.
+ */
+ if (queue_depth > 0 && wakeups <= ios)
+ worker = pgaio_worker_choose_idle();
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
+ else if (wakeups <= ios)
+ pgaio_worker_consider_new_worker(queue_depth);
if (io_index != UINT32_MAX)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++ios == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
@@ -593,8 +797,69 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker if pending. */
+ pgaio_worker_cancel_new_worker();
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* On first sleep, compute absolute timeout. */
+ if (idle_timeout_abs == 0)
+ {
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+
+ /*
+ * All workers maintain the absolute timeout value, but only
+ * the highest worker can actually time out and only if
+ * io_min_workers is exceeded. All others wait only for
+ * explicit wakeups caused by queue insertion, wakeup
+ * propagation, change of pool size (possibly making them
+ * highest), or GUC reload.
+ */
+ if (pgaio_worker_can_timeout())
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now,
+ idle_timeout_abs);
+ else
+ timeout_ms = -1;
+ }
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++wakeups == PGAIO_WORKER_STATS_MAX)
+ {
+ ios /= 2;
+ wakeups /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
@@ -604,6 +869,10 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
}
}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..c6c8107fe33 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -352,6 +352,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
+AioWorkerControl "Waiting to update AIO worker information."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..ecb16facb67 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3306,14 +3306,52 @@ struct config_int ConfigureNamesInt[] =
},
{
- {"io_workers",
+ {"io_max_workers",
PGC_SIGHUP,
RESOURCES_IO,
- gettext_noop("Number of IO worker processes, for io_method=worker."),
+ gettext_noop("Maximum number of IO worker processes, for io_method=worker."),
NULL,
},
- &io_workers,
- 3, 1, MAX_IO_WORKERS,
+ &io_max_workers,
+ 8, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_min_workers",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Minimum number of IO worker processes, for io_method=worker."),
+ NULL,
+ },
+ &io_min_workers,
+ 1, 1, MAX_IO_WORKERS,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_idle_timeout",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time before IO workers exit, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_idle_timeout,
+ 60 * 1000, -1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"io_worker_launch_interval",
+ PGC_SIGHUP,
+ RESOURCES_IO,
+ gettext_noop("Maximum idle time between launching IO workers, for io_method=worker."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &io_worker_launch_interval,
+ 500, 0, INT_MAX,
NULL, NULL, NULL
},
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..1da6345ad7a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -210,7 +210,10 @@
# can execute simultaneously
# -1 sets based on shared_buffers
# (change requires restart)
-#io_workers = 3 # 1-32;
+#io_min_workers = 1 # 1-32;
+#io_max_workers = 8 # 1-32;
+#io_worker_idle_timeout = 60s # min 100ms
+#io_worker_launch_interval = 500ms # min 0ms
# - Worker Processes -
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index 7bde7e89c8a..de9c80109e0 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -17,6 +17,13 @@
pg_noreturn extern void IoWorkerMain(const void *startup_data, size_t startup_data_len);
-extern PGDLLIMPORT int io_workers;
+extern PGDLLIMPORT int io_min_workers;
+extern PGDLLIMPORT int io_max_workers;
+extern PGDLLIMPORT int io_worker_idle_timeout;
+extern PGDLLIMPORT int io_worker_launch_interval;
+
+/* Interfaces visible to the postmaster. */
+extern bool pgaio_worker_test_new_worker_needed(void);
+extern bool pgaio_worker_clear_new_worker_needed(void);
#endif /* IO_WORKER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..c1801d08833 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, AioWorkerControl)
diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h
index 428aa3fd68a..2859a636349 100644
--- a/src/include/storage/pmsignal.h
+++ b/src/include/storage/pmsignal.h
@@ -38,6 +38,7 @@ typedef enum
PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
+ PMSIGNAL_IO_WORKER_CHANGE, /* IO worker pool change */
PMSIGNAL_BACKGROUND_WORKER_CHANGE, /* background worker state change */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_ADVANCE_STATE_MACHINE, /* advance postmaster's state machine */
diff --git a/src/test/modules/test_aio/t/002_io_workers.pl b/src/test/modules/test_aio/t/002_io_workers.pl
index af5fae15ea7..a0252857798 100644
--- a/src/test/modules/test_aio/t/002_io_workers.pl
+++ b/src/test/modules/test_aio/t/002_io_workers.pl
@@ -14,6 +14,9 @@ $node->init();
$node->append_conf(
'postgresql.conf', qq(
io_method=worker
+io_worker_idle_timeout=0ms
+io_worker_launch_interval=0ms
+io_max_workers=32
));
$node->start();
@@ -31,7 +34,7 @@ sub test_number_of_io_workers_dynamic
{
my $node = shift;
- my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_workers');
+ my $prev_worker_count = $node->safe_psql('postgres', 'SHOW io_min_workers');
# Verify that worker count can't be set to 0
change_number_of_io_workers($node, 0, $prev_worker_count, 1);
@@ -62,23 +65,23 @@ sub change_number_of_io_workers
my ($result, $stdout, $stderr);
($result, $stdout, $stderr) =
- $node->psql('postgres', "ALTER SYSTEM SET io_workers = $worker_count");
+ $node->psql('postgres', "ALTER SYSTEM SET io_min_workers = $worker_count");
$node->safe_psql('postgres', 'SELECT pg_reload_conf()');
if ($expect_failure)
{
ok( $stderr =~
- /$worker_count is outside the valid range for parameter "io_workers"/,
- "updating number of io_workers to $worker_count failed, as expected"
+ /$worker_count is outside the valid range for parameter "io_min_workers"/,
+ "updating number of io_min_workers to $worker_count failed, as expected"
);
return $prev_worker_count;
}
else
{
- is( $node->safe_psql('postgres', 'SHOW io_workers'),
+ is( $node->safe_psql('postgres', 'SHOW io_min_workers'),
$worker_count,
- "updating number of io_workers from $prev_worker_count to $worker_count"
+ "updating number of io_min_workers from $prev_worker_count to $worker_count"
);
check_io_worker_count($node, $worker_count);
--
2.47.2
view thread (24+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Automatically sizing the IO worker pool
In-Reply-To: <CA+hUKGJLe-0E-wZ-is78CEHhjbC=ihMVCQLoN1dmD-j05s9qRg@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox