public inbox for [email protected]
help / color / mirror / Atom feedFrom: Melanie Plageman <[email protected]>
Subject: [PATCH v8 7/9] Restructure AsyncReadBuffers()
Date: Wed, 18 Mar 2026 11:09:58 -0400
Restructure AsyncReadBuffers() to use early return when the head buffer
is already valid, instead of using a did_start_io flag and if/else
branches. Also move around a bit of the code to be located closer to
where it is used. This is a refactor only.
Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
src/backend/storage/buffer/bufmgr.c | 171 ++++++++++++++--------------
1 file changed, 88 insertions(+), 83 deletions(-)
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fa85570a791..e212f6110f2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1920,7 +1920,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
void *io_pages[MAX_IO_COMBINE_LIMIT];
IOContext io_context;
IOObject io_object;
- bool did_start_io;
+ instr_time io_start;
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(operation->strategy);
+ io_object = IOOBJECT_RELATION;
+ }
/*
* When this IO is executed synchronously, either because the caller will
@@ -1931,16 +1942,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
ioh_flags |= PGAIO_HF_SYNCHRONOUS;
if (persistence == RELPERSISTENCE_TEMP)
- {
- io_context = IOCONTEXT_NORMAL;
- io_object = IOOBJECT_TEMP_RELATION;
ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
- }
- else
- {
- io_context = IOContextForStrategy(operation->strategy);
- io_object = IOOBJECT_RELATION;
- }
/*
* If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
@@ -1992,7 +1994,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
if (unlikely(!ioh))
{
pgaio_submit_staged();
-
ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
}
@@ -2017,91 +2018,95 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
pgaio_io_release(ioh);
pgaio_wref_clear(&operation->io_wref);
- did_start_io = false;
/*
* Report and track this as a 'hit' for this backend, even though it
* must have started out as a miss in PinBufferForBlock(). The other
* backend will track this as a 'read'.
*/
- TrackBufferHit(io_object, io_context, operation->rel, persistence,
- operation->smgr, forknum, blocknum);
+ TrackBufferHit(io_object, io_context,
+ operation->rel, operation->persistence,
+ operation->smgr, operation->forknum,
+ blocknum);
+ return false;
}
- else
+
+ Assert(io_buffers[0] == buffers[nblocks_done]);
+ io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+ io_buffers_len = 1;
+
+ /*
+ * NB: As little code as possible should be added between the
+ * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
+ * below and the smgrstartreadv(), as some of the buffers are now marked
+ * as IO_IN_PROGRESS and will thus cause other backends to wait.
+ */
+
+ /*
+ * How many neighboring-on-disk blocks can we scatter-read into other
+ * buffers at the same time? In this case we don't wait if we see an I/O
+ * already in progress (see comment above).
+ */
+ for (int i = nblocks_done + 1; i < operation->nblocks; i++)
{
- instr_time io_start;
+ /* Must be consecutive block numbers. */
+ Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+ BufferGetBlockNumber(buffers[i]) - 1);
- /* We found a buffer that we need to read in. */
- Assert(io_buffers[0] == buffers[nblocks_done]);
- io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
- io_buffers_len = 1;
+ if (!ReadBuffersCanStartIO(buffers[i], true))
+ break;
- /*
- * How many neighboring-on-disk blocks can we scatter-read into other
- * buffers at the same time? In this case we don't wait if we see an
- * I/O already in progress. We already set BM_IO_IN_PROGRESS for the
- * head block, so we should get on with that I/O as soon as possible.
- */
- for (int i = nblocks_done + 1; i < operation->nblocks; i++)
- {
- if (!ReadBuffersCanStartIO(buffers[i], true))
- break;
- /* Must be consecutive block numbers. */
- Assert(BufferGetBlockNumber(buffers[i - 1]) ==
- BufferGetBlockNumber(buffers[i]) - 1);
- Assert(io_buffers[io_buffers_len] == buffers[i]);
+ Assert(io_buffers[io_buffers_len] == buffers[i]);
- io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
- }
-
- /* get a reference to wait for in WaitReadBuffers() */
- pgaio_io_get_wref(ioh, &operation->io_wref);
-
- /* provide the list of buffers to the completion callbacks */
- pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
-
- pgaio_io_register_callbacks(ioh,
- persistence == RELPERSISTENCE_TEMP ?
- PGAIO_HCB_LOCAL_BUFFER_READV :
- PGAIO_HCB_SHARED_BUFFER_READV,
- flags);
-
- pgaio_io_set_flag(ioh, ioh_flags);
-
- /* ---
- * Even though we're trying to issue IO asynchronously, track the time
- * in smgrstartreadv():
- * - if io_method == IOMETHOD_SYNC, we will always perform the IO
- * immediately
- * - the io method might not support the IO (e.g. worker IO for a temp
- * table)
- * ---
- */
- io_start = pgstat_prepare_io_time(track_io_timing);
- smgrstartreadv(ioh, operation->smgr, forknum,
- blocknum,
- io_pages, io_buffers_len);
- pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
- io_start, 1, io_buffers_len * BLCKSZ);
-
- if (persistence == RELPERSISTENCE_TEMP)
- pgBufferUsage.local_blks_read += io_buffers_len;
- else
- pgBufferUsage.shared_blks_read += io_buffers_len;
-
- /*
- * Track vacuum cost when issuing IO, not after waiting for it.
- * Otherwise we could end up issuing a lot of IO in a short timespan,
- * despite a low cost limit.
- */
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
-
- *nblocks_progress = io_buffers_len;
- did_start_io = true;
+ io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
}
- return did_start_io;
+ /* get a reference to wait for in WaitReadBuffers() */
+ pgaio_io_get_wref(ioh, &operation->io_wref);
+
+ /* provide the list of buffers to the completion callbacks */
+ pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+
+ pgaio_io_register_callbacks(ioh,
+ persistence == RELPERSISTENCE_TEMP ?
+ PGAIO_HCB_LOCAL_BUFFER_READV :
+ PGAIO_HCB_SHARED_BUFFER_READV,
+ flags);
+
+ pgaio_io_set_flag(ioh, ioh_flags);
+
+ /* ---
+ * Even though we're trying to issue IO asynchronously, track the time
+ * in smgrstartreadv():
+ * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+ * immediately
+ * - the io method might not support the IO (e.g. worker IO for a temp
+ * table)
+ * ---
+ */
+ io_start = pgstat_prepare_io_time(track_io_timing);
+ smgrstartreadv(ioh, operation->smgr, forknum,
+ blocknum,
+ io_pages, io_buffers_len);
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+ io_start, 1, io_buffers_len * BLCKSZ);
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_read += io_buffers_len;
+ else
+ pgBufferUsage.shared_blks_read += io_buffers_len;
+
+ /*
+ * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+ * we could end up issuing a lot of IO in a short timespan, despite a low
+ * cost limit.
+ */
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+
+ *nblocks_progress = io_buffers_len;
+
+ return true;
}
/*
--
2.53.0.1.gb2826b52eb
--zousfcpkuk57oxzf
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
filename="v8-0008-bufmgr-Improve-StartBufferIO-interface.patch"
view thread (2+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v8 7/9] Restructure AsyncReadBuffers()
In-Reply-To: <no-message-id-724651@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox