From 1b1d59f667c23a6da8323b903634c0b5c3bd08db Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 15 Aug 2025 11:01:52 -0400 Subject: [PATCH v6 3/4] bufmgr: aio: Prototype for not waiting for already-in-progress IO Author: Reviewed-by: Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv Backpatch: --- src/include/storage/bufmgr.h | 1 + src/backend/storage/buffer/bufmgr.c | 150 ++++++++++++++++++++++++++-- 2 files changed, 142 insertions(+), 9 deletions(-) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 715ae96f0..0b6fa848a 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -147,6 +147,7 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a036c2aa2..d3dd64808 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1632,6 +1632,46 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); } +/* + * Check if the buffer is already undergoing read AIO. If it is, assign the + * IO's wait reference to operation->io_wref, thereby allowing the caller to + * wait for that IO. + */ +static inline bool +ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer) +{ + BufferDesc *desc; + uint32 buf_state; + PgAioWaitRef iow; + + pgaio_wref_clear(&iow); + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID)) + iow = desc->io_wref; + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + + if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID)) + iow = desc->io_wref; + UnlockBufHdr(desc); + } + + if (pgaio_wref_valid(&iow)) + { + operation->io_wref = iow; + return true; + } + + return false; +} + /* * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. */ @@ -1764,7 +1804,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) * * we first check if we already know the IO is complete. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1783,11 +1823,66 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc; + uint32 buf_state; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + UnlockBufHdr(desc); + } + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Report and track this as a 'hit' for this backend, even + * though it must have started out as a miss in + * PinBufferForBlock(). The other backend (or ourselves, + * as part of a read started earlier) will track this as a + * 'read'. + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(operation->forknum, + operation->blocknum + operation->nblocks_done, + operation->smgr->smgr_rlocator.locator.spcOid, + operation->smgr->smgr_rlocator.locator.dbOid, + operation->smgr->smgr_rlocator.locator.relNumber, + operation->smgr->smgr_rlocator.backend, + true); + + if (BufferIsLocal(buffer)) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (operation->rel) + pgstat_count_buffer_hit(operation->rel); + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1873,6 +1968,43 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) io_object = IOOBJECT_RELATION; } + /* + * If AIO is in progress, be it in this backend or another backend, we + * just associate the wait reference with the operation and wait in + * WaitReadBuffers(). This turns out to be important for performance in + * two workloads: + * + * 1) A read stream that has to read the same block multiple times within + * the readahead distance. This can happen e.g. for the table accesses of + * an index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be + * able to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend starts IO on the buffer between this + * check and the ReadBuffersCanStartIO(nowait = false) below. In that case + * we will synchronously wait for the IO below, but the window for that is + * small enough that it won't happen often enough to have a significant + * performance impact. + */ + if (ReadBuffersIOAlreadyInProgress(operation, buffers[nblocks_done])) + { + *nblocks_progress = 1; + operation->foreign_io = true; + + CheckReadBuffersOperation(operation, false); + + + return true; + } + + operation->foreign_io = false; + /* * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR * flag. The reason for that is that, hopefully, zero_damaged_pages isn't @@ -1930,9 +2062,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* * Check if we can start IO on the first to-be-read buffer. * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * If a synchronous I/O is in progress in another backend (it can't be + * this backend), we want to wait for the outcome: either done, or + * something went wrong and we will retry. */ if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) { -- 2.51.0