public inbox for [email protected]
help / color / mirror / Atom feedFrom: Andres Freund <[email protected]>
Subject: [PATCH v4 06/14] WIP: read stream: Split decision about look ahead for AIO and combining
Date: Tue, 31 Mar 2026 12:45:41 -0400
"read_stream: Only increase distance when waiting for IO" caused a regression
due to the this conflation, because we ended up not allowing IO combining when
never needing to wait for IO (as the distance ended up too small to allow for
that), which increased CPU overhead.
If we go this way, it probably should be moved ahead of the aforementioned
commit.
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/backend/storage/aio/read_stream.c | 147 ++++++++++++++++++++------
1 file changed, 115 insertions(+), 32 deletions(-)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 5d7bfb8fa00..9d22a119bef 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
int16 max_pinned_buffers;
int16 forwarded_buffers;
int16 pinned_buffers;
- int16 distance;
+ /* limit of how far to look ahead for IO combining */
+ int16 combine_distance;
+ /* limit of how far to look ahead for starting IO early */
+ int16 readahead_distance;
uint16 distance_decay_holdoff;
int16 initialized_buffers;
- int16 resume_distance;
+ int16 resume_readahead_distance;
+ int16 resume_combine_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
/* Shrink distance: no more look-ahead until buffers are released. */
new_distance = stream->pinned_buffers + buffer_limit;
- if (stream->distance > new_distance)
- stream->distance = new_distance;
+ if (stream->readahead_distance > new_distance)
+ stream->readahead_distance = new_distance;
/* Unless we have nothing to give the consumer, stop here. */
if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
* perform IO asynchronously when starting out with a small look-ahead
* distance.
*/
- if (stream->distance > 1 && stream->ios_in_progress == 0)
+ if (stream->ios_in_progress == 0)
{
- if (stream->distance_decay_holdoff == 0)
- stream->distance--;
- else
+ if (stream->distance_decay_holdoff > 0)
stream->distance_decay_holdoff--;
+ else
+ {
+ if (stream->readahead_distance > 1)
+ stream->readahead_distance--;
+
+ /*
+ * XXX: Should we actually reduce this at any time other than
+ * a reset? For now we have to, as this is also a condition
+ * for re-enabling fast_path.
+ */
+ if (stream->combine_distance > 1)
+ stream->combine_distance--;
+ }
}
}
else
@@ -452,18 +467,33 @@ read_stream_should_look_ahead(ReadStream *stream)
return false;
/* If the callback has signaled end-of-stream, we're done */
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return false;
/* never pin more buffers than allowed */
if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
return false;
+ /*
+ * Allow looking further ahead if we have an the process of building a
+ * larger IO, the IO is not yet big enough and we don't yet have IO in
+ * flight. Note that this is allowed even if we are reaching the
+ * read-ahead limit (but not the buffer pin limit).
+ *
+ * This is important for cases where either effective_io_concurrency is
+ * low or we never need to wait for IO and thus are not increasing the
+ * distance. Without this we would end up with lots of small IOs.
+ */
+ if (stream->pending_read_nblocks > 0 &&
+ stream->pinned_buffers == 0 &&
+ stream->pending_read_nblocks < stream->combine_distance)
+ return true;
+
/*
* Don't start more read-ahead if that'd put us over the distance limit
* for doing read-ahead.
*/
- if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+ if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
return false;
return true;
@@ -492,7 +522,7 @@ read_stream_should_issue_now(ReadStream *stream)
* If the callback has signaled end-of-stream, start the pending read
* immediately. There is no further potential for IO combining.
*/
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return true;
/*
@@ -502,6 +532,11 @@ read_stream_should_issue_now(ReadStream *stream)
if (pending_read_nblocks >= stream->io_combine_limit)
return true;
+ /* same if capped not by io_combine_limit but combine_distance */
+ if (stream->combine_distance > 0 &&
+ pending_read_nblocks >= stream->combine_distance)
+ return true;
+
/*
* If we currently have no reads in flight or prepared, issue the IO once
* we are not looking ahead further. This ensures there's always at least
@@ -552,7 +587,8 @@ read_stream_look_ahead(ReadStream *stream)
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
break;
}
@@ -599,7 +635,7 @@ read_stream_look_ahead(ReadStream *stream)
* stream. In the worst case we can always make progress one buffer at a
* time.
*/
- Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+ Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
if (stream->batch_mode)
pgaio_exit_batchmode();
@@ -789,10 +825,17 @@ read_stream_begin_impl(int flags,
* doing full io_combine_limit sized reads.
*/
if (flags & READ_STREAM_FULL)
- stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ {
+ stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ stream->combine_distance = stream->io_combine_limit;
+ }
else
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ {
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ }
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
/*
* Since we always access the same relation, we can initialize parts of
@@ -891,7 +934,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Assert(stream->ios_in_progress == 0);
Assert(stream->forwarded_buffers == 0);
Assert(stream->pinned_buffers == 1);
- Assert(stream->distance == 1);
+ Assert(stream->readahead_distance == 1);
+ Assert(stream->combine_distance == 1);
Assert(stream->pending_read_nblocks == 0);
Assert(stream->per_buffer_data_size == 0);
Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -965,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
else
{
/* No more blocks, end of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -981,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
/* End of stream reached? */
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return InvalidBuffer;
/*
@@ -995,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
/* End of stream reached? */
if (stream->pinned_buffers == 0)
{
- Assert(stream->distance == 0);
+ Assert(stream->readahead_distance == 0);
return InvalidBuffer;
}
}
@@ -1016,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
{
int16 io_index = stream->oldest_io_index;
- int32 distance; /* wider temporary value, clamped below */
bool needed_wait;
/* Sanity check that we still agree on the buffers. */
@@ -1056,11 +1099,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
* the stream, as stream->distance == 0 is used to keep track of
* having reached the end.
*/
- if (stream->distance > 0 && needed_wait)
+ if (stream->readahead_distance > 0 && needed_wait)
{
- distance = stream->distance * 2;
- distance = Min(distance, stream->max_pinned_buffers);
- stream->distance = distance;
+ /* wider temporary value, due to oveflow risk */
+ int32 readahead_distance;
+
+ readahead_distance = stream->readahead_distance * 2;
+ readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+ stream->readahead_distance = readahead_distance;
+ }
+
+ /*
+ * Whether we needed to wait or not, allow for more IO combining if we
+ * needed to do IO. The reason to do so independent of needing to wait
+ * is that when the data is resident in the kernel page cache, IO
+ * combining reduces the syscall / dispatch overhead, making it
+ * worthwhile regardless of needing to wait.
+ *
+ * It is also important with io_uring as it will never signal the need
+ * to wait for reads if all the data is in the page cache. There are
+ * heuristics to deal with that in method_io_uring.c, but they only
+ * work when the IO gets large enough.
+ */
+ if (stream->combine_distance > 0 &&
+ stream->combine_distance < stream->io_combine_limit)
+ {
+ /* wider temporary value, due to oveflow risk */
+ int32 combine_distance;
+
+ combine_distance = stream->combine_distance * 2;
+ combine_distance = Min(combine_distance, stream->io_combine_limit);
+ combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+ stream->combine_distance = combine_distance;
}
/*
@@ -1139,10 +1209,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
#ifndef READ_STREAM_DISABLE_FAST_PATH
/* See if we can take the fast path for all-cached scans next time. */
+ /*
+ * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+ * several pre-existing cases where it triggers because we are not issuing
+ * additional prefetching (e.g. because of a small
+ * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+ * after read_stream_look_ahead().
+ */
if (stream->ios_in_progress == 0 &&
stream->forwarded_buffers == 0 &&
stream->pinned_buffers == 1 &&
- stream->distance == 1 &&
+ stream->readahead_distance == 1 &&
+ stream->combine_distance == 1 &&
stream->pending_read_nblocks == 0 &&
stream->per_buffer_data_size == 0)
{
@@ -1188,8 +1266,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
BlockNumber
read_stream_pause(ReadStream *stream)
{
- stream->resume_distance = stream->distance;
- stream->distance = 0;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
+ stream->readahead_distance = 0;
return InvalidBlockNumber;
}
@@ -1201,7 +1280,8 @@ read_stream_pause(ReadStream *stream)
void
read_stream_resume(ReadStream *stream)
{
- stream->distance = stream->resume_distance;
+ stream->readahead_distance = stream->resume_readahead_distance;
+ stream->combine_distance = stream->resume_combine_distance;
}
/*
@@ -1217,7 +1297,8 @@ read_stream_reset(ReadStream *stream)
Buffer buffer;
/* Stop looking ahead. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
/* Forget buffered block number and fast path state. */
stream->buffered_blocknum = InvalidBlockNumber;
@@ -1249,8 +1330,10 @@ read_stream_reset(ReadStream *stream)
Assert(stream->ios_in_progress == 0);
/* Start off assuming data is cached. */
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
stream->distance_decay_holdoff = 0;
}
--
2.53.0.1.gb2826b52eb
--ik3l34cft6pr4f53
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
filename="v4-0007-Hacky-implementation-of-making-read_stream_reset-.patch"
view thread (2+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v4 06/14] WIP: read stream: Split decision about look ahead for AIO and combining
In-Reply-To: <no-message-id-601363@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox