From d21af1ece0ec3e27c8228bbe15f2d03270d317c2 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 4 Mar 2026 22:30:38 +0100 Subject: [PATCH v12 21/23] Conditional locking in pgaio_worker_submit_internal If AioWorkerSubmissionQueueLock can't be acquired without waiting, perform the I/O synchronously instead. There's a single submission queue, and in cases with high I/O concurrency (e.g. multiple backends executing large index scans), it was possible to end up with most time spent waiting for the single LWLock. While at it, optimize the case when we can't add entries into a full queue. Instead of adding them to the array of sync I/Os one by one, interrupt the loop on the first failure and handle all remaining items synchronously. --- src/backend/storage/aio/method_worker.c | 58 +++++++++++++++---------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index d9617c20e..18fde63d6 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -242,40 +242,54 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { - PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + PgAioHandle **synchronous_ios = NULL; int nsync = 0; Latch *wakeup = NULL; int worker; Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); - LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < num_staged_ios; ++i) + if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE)) { - Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); - if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + for (int i = 0; i < num_staged_ios; ++i) { - /* - * We'll do it synchronously, but only after we've sent as many as - * we can to workers, to maximize concurrency. - */ - synchronous_ios[nsync++] = staged_ios[i]; - continue; - } + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) + { + /* + * Do the rest synchronously. If the queue is full, it's unlikely + * to get some free space while this tight loop is running. + */ + synchronous_ios = &staged_ios[i]; + nsync = (num_staged_ios - i); - if (wakeup == NULL) - { - /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_worker_choose_idle(); - if (worker >= 0) - wakeup = io_worker_control->workers[worker].latch; + /* + * XXX maybe we should still try to get an idle worker, if we + * put anything in the queue? + */ + break; + } - pgaio_debug_io(DEBUG4, staged_ios[i], - "choosing worker %d", - worker); + if (wakeup == NULL) + { + /* Choose an idle worker to wake up if we haven't already. */ + worker = pgaio_worker_choose_idle(); + if (worker >= 0) + wakeup = io_worker_control->workers[worker].latch; + + pgaio_debug_io(DEBUG4, staged_ios[i], + "choosing worker %d", + worker); + } } + LWLockRelease(AioWorkerSubmissionQueueLock); + } + else + { + /* do everything synchronously, no wakeup needed */ + synchronous_ios = staged_ios; + nsync = num_staged_ios; } - LWLockRelease(AioWorkerSubmissionQueueLock); if (wakeup) SetLatch(wakeup); -- 2.53.0