public inbox for [email protected]
help / color / mirror / Atom feedFrom: Andres Freund <[email protected]>
Subject: [PATCH v3 08/10] WIP: read stream: Split decision about look ahead for AIO and combining
Date: Mon, 30 Mar 2026 12:25:07 -0400
Previous commits caused a regression due to the this conflation. This is a
first attempt at fixing the problem. Needs significant reordering and
splitting if it works out.
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/backend/storage/aio/read_stream.c | 242 +++++++++++++++++++++-----
1 file changed, 195 insertions(+), 47 deletions(-)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 144b3613c92..1c375edad1d 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
int16 max_pinned_buffers;
int16 forwarded_buffers;
int16 pinned_buffers;
- int16 distance;
+ /* limit of how far to read ahead for IO combining */
+ int16 combine_distance;
+ /* limit of how far to read ahead for starting IO early */
+ int16 readahead_distance;
uint16 distance_decay_holdoff;
int16 initialized_buffers;
- int16 resume_distance;
+ int16 resume_readahead_distance;
+ int16 resume_combine_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
/* Shrink distance: no more look-ahead until buffers are released. */
new_distance = stream->pinned_buffers + buffer_limit;
- if (stream->distance > new_distance)
- stream->distance = new_distance;
+ if (stream->readahead_distance > new_distance)
+ stream->readahead_distance = new_distance;
/* Unless we have nothing to give the consumer, stop here. */
if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
* perform IO asynchronously when starting out with a small look-ahead
* distance.
*/
- if (stream->distance > 1 && stream->ios_in_progress == 0)
+ if (stream->ios_in_progress == 0)
{
- if (stream->distance_decay_holdoff == 0)
- stream->distance--;
- else
+ if (stream->distance_decay_holdoff > 0)
stream->distance_decay_holdoff--;
+ else
+ {
+ if (stream->readahead_distance > 1)
+ stream->readahead_distance--;
+
+ /*
+ * XXX: Should we actually reduce this at any time other than
+ * a reset? For now we have to, as this is also a condition
+ * for re-enabling fast_path.
+ */
+ if (stream->combine_distance > 1)
+ stream->combine_distance--;
+ }
}
}
else
@@ -440,6 +455,101 @@ read_stream_start_pending_read(ReadStream *stream)
return true;
}
+/*
+ * Should we continue to perform look ahead? The look ahead may allow us to
+ * make the pending IO larger via IO combining or to issue more read ahead.
+ */
+static bool
+read_stream_should_look_ahead(ReadStream *stream)
+{
+ /* never start more IOs than our cap */
+ if (stream->ios_in_progress >= stream->max_ios)
+ return false;
+
+ /* If the callback has signaled end-of-stream, we're done */
+ if (stream->readahead_distance <= 0)
+ return false;
+
+ /* never pin more buffers than allowed */
+ if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
+ return false;
+
+ /*
+ * Allow looking further ahead if we have an the process of building a
+ * larger IO, the IO is not yet big enough and we don't yet have IO in
+ * flight. Note that this is allowed even if we are reaching the
+ * readahead limit (but not the buffer pin limit).
+ *
+ * This is important for cases where either effective_io_concurrency is
+ * low or we never need to wait for IO and thus are not increasing the
+ * distance. Without this we would end up with lots of small IOs.
+ */
+ if (stream->pending_read_nblocks > 0 &&
+ stream->pinned_buffers == 0 &&
+ stream->pending_read_nblocks < stream->combine_distance)
+ return true;
+
+ /*
+ * Don't start more readahead if that'd put us over the limit for doing
+ * readahead.
+ */
+ if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
+ return false;
+
+ return true;
+}
+
+
+/*
+ * We don't start the pending read just because we've hit the distance limit,
+ * preferring to give it another chance to grow to full io_combine_limit size
+ * once more buffers have been consumed. But this is not desirable in all
+ * situations - see below.
+ */
+static bool
+read_stream_should_issue_now(ReadStream *stream)
+{
+ int16 pending_read_nblocks = stream->pending_read_nblocks;
+
+ /* no IO to issue */
+ if (pending_read_nblocks == 0)
+ return false;
+
+ /* never start more IOs than our cap */
+ if (stream->ios_in_progress >= stream->max_ios)
+ return false;
+
+ /*
+ * If the callback has signaled end-of-stream, start the read
+ * immediately. There's no deferring it for later.
+ */
+ if (stream->readahead_distance <= 0)
+ return true;
+
+ /*
+ * If we've already reached io_combine_limit, there's no chance of growing
+ * the read further.
+ */
+ if (pending_read_nblocks >= stream->io_combine_limit)
+ return true;
+
+ /* same if capped not by io_combine_limit but combine_distance */
+ if (stream->combine_distance > 0 &&
+ pending_read_nblocks >= stream->combine_distance)
+ return true;
+
+ /*
+ * If we currently have no reads in flight or prepared, issue the IO once
+ * we stopped looking ahead. This ensures there's always at least one IO
+ * prepared.
+ */
+ if (stream->pinned_buffers == 0 &&
+ !read_stream_should_look_ahead(stream))
+ return true;
+
+ return false;
+}
+
static void
read_stream_look_ahead(ReadStream *stream)
{
@@ -452,14 +562,13 @@ read_stream_look_ahead(ReadStream *stream)
if (stream->batch_mode)
pgaio_enter_batchmode();
- while (stream->ios_in_progress < stream->max_ios &&
- stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+ while (read_stream_should_look_ahead(stream))
{
BlockNumber blocknum;
int16 buffer_index;
void *per_buffer_data;
- if (stream->pending_read_nblocks == stream->io_combine_limit)
+ if (read_stream_should_issue_now(stream))
{
read_stream_start_pending_read(stream);
continue;
@@ -479,7 +588,8 @@ read_stream_look_ahead(ReadStream *stream)
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
break;
}
@@ -511,21 +621,13 @@ read_stream_look_ahead(ReadStream *stream)
}
/*
- * We don't start the pending read just because we've hit the distance
- * limit, preferring to give it another chance to grow to full
- * io_combine_limit size once more buffers have been consumed. However,
- * if we've already reached io_combine_limit, or we've reached the
- * distance limit and there isn't anything pinned yet, or the callback has
- * signaled end-of-stream, we start the read immediately. Note that the
- * pending read can exceed the distance goal, if the latter was reduced
- * after hitting the per-backend buffer limit.
+ * Check if the pending read should be issued now, or if we should give it
+ * another chance to grow to the full size.
+ *
+ * Note that the pending read can exceed the distance goal, if the latter
+ * was reduced after hitting the per-backend buffer limit.
*/
- if (stream->pending_read_nblocks > 0 &&
- (stream->pending_read_nblocks == stream->io_combine_limit ||
- (stream->pending_read_nblocks >= stream->distance &&
- stream->pinned_buffers == 0) ||
- stream->distance <= 0) &&
- stream->ios_in_progress < stream->max_ios)
+ if (read_stream_should_issue_now(stream))
read_stream_start_pending_read(stream);
/*
@@ -534,7 +636,7 @@ read_stream_look_ahead(ReadStream *stream)
* stream. In the worst case we can always make progress one buffer at a
* time.
*/
- Assert(stream->pinned_buffers > 0 || stream->distance <= 0);
+ Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
if (stream->batch_mode)
pgaio_exit_batchmode();
@@ -724,10 +826,17 @@ read_stream_begin_impl(int flags,
* doing full io_combine_limit sized reads.
*/
if (flags & READ_STREAM_FULL)
- stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ {
+ stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ stream->combine_distance = stream->io_combine_limit;
+ }
else
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ {
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ }
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
/*
* Since we always access the same relation, we can initialize parts of
@@ -826,7 +935,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Assert(stream->ios_in_progress == 0);
Assert(stream->forwarded_buffers == 0);
Assert(stream->pinned_buffers == 1);
- Assert(stream->distance == 1);
+ Assert(stream->readahead_distance == 1);
Assert(stream->pending_read_nblocks == 0);
Assert(stream->per_buffer_data_size == 0);
Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -900,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
else
{
/* No more blocks, end of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -916,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
/* End of stream reached? */
- if (stream->distance <= 0)
+ if (stream->readahead_distance <= 0)
return InvalidBuffer;
/*
@@ -930,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
/* End of stream reached? */
if (stream->pinned_buffers == 0)
{
- Assert(stream->distance <= 0);
+ Assert(stream->readahead_distance <= 0);
return InvalidBuffer;
}
}
@@ -951,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
{
int16 io_index = stream->oldest_io_index;
- int32 distance; /* wider temporary value, clamped below */
bool needed_wait;
/* Sanity check that we still agree on the buffers. */
@@ -962,7 +1070,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
* If the stream has been reset, don't even wait for the IO, just
* discard it.
*/
- if (stream->distance < 0)
+ if (stream->readahead_distance < 0)
{
if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) &&
!stream->ios[io_index].op.foreign_io)
@@ -1011,11 +1119,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
* the stream, as stream->distance == 0 is used to keep track of
* having reached the end.
*/
- if (stream->distance > 0 && needed_wait)
+ if (stream->readahead_distance > 0 && needed_wait)
{
- distance = stream->distance * 2;
- distance = Min(distance, stream->max_pinned_buffers);
- stream->distance = distance;
+ /* wider temporary value, due to oveflow risk */
+ int32 readahead_distance;
+
+ readahead_distance = stream->readahead_distance * 2;
+ readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+ stream->readahead_distance = readahead_distance;
+ }
+
+ /*
+ * Whether we needed to wait or not, allow for more IO combining if we
+ * needed to do IO. The reason to do so independent of needing to wait
+ * is that when the data is resident in the kernel page cache, IO
+ * combining reduces the syscall / dispatch overhead, making it
+ * worthwhile regardless of needing to wait.
+ *
+ * It is also important with io_uring as it will never signal the need
+ * to wait for reads if all the data is in the page cache. There are
+ * heuristics to deal with that in method_io_uring.c, but they only
+ * work when the IO gets large enough.
+ */
+ if (stream->combine_distance > 0 &&
+ stream->combine_distance < stream->io_combine_limit)
+ {
+ /* wider temporary value, due to oveflow risk */
+ int32 combine_distance;
+
+ combine_distance = stream->combine_distance * 2;
+ combine_distance = Min(combine_distance, stream->io_combine_limit);
+ combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+ stream->combine_distance = combine_distance;
}
/*
@@ -1094,10 +1229,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
#ifndef READ_STREAM_DISABLE_FAST_PATH
/* See if we can take the fast path for all-cached scans next time. */
+ /*
+ * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+ * several pre-existing cases where it triggers because we are not issuing
+ * additional prefetching (e.g. because of a small
+ * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+ * after read_stream_look_ahead().
+ */
if (stream->ios_in_progress == 0 &&
stream->forwarded_buffers == 0 &&
stream->pinned_buffers == 1 &&
- stream->distance == 1 &&
+ stream->readahead_distance == 1 &&
+ stream->combine_distance == 1 &&
stream->pending_read_nblocks == 0 &&
stream->per_buffer_data_size == 0)
{
@@ -1143,8 +1286,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
BlockNumber
read_stream_pause(ReadStream *stream)
{
- stream->resume_distance = stream->distance;
- stream->distance = 0;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
+ stream->readahead_distance = 0;
return InvalidBlockNumber;
}
@@ -1156,7 +1300,8 @@ read_stream_pause(ReadStream *stream)
void
read_stream_resume(ReadStream *stream)
{
- stream->distance = stream->resume_distance;
+ stream->readahead_distance = stream->resume_readahead_distance;
+ stream->combine_distance = stream->resume_combine_distance;
}
/*
@@ -1172,7 +1317,8 @@ read_stream_reset(ReadStream *stream)
Buffer buffer;
/* Stop looking ahead. */
- stream->distance = -1;
+ stream->readahead_distance = -1;
+ stream->combine_distance = -1;
/* Forget buffered block number and fast path state. */
stream->buffered_blocknum = InvalidBlockNumber;
@@ -1204,8 +1350,10 @@ read_stream_reset(ReadStream *stream)
Assert(stream->ios_in_progress == 0);
/* Start off assuming data is cached. */
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
stream->distance_decay_holdoff = 0;
}
--
2.53.0.1.gb2826b52eb
--biispz4aupeya6ku--
view thread (2+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v3 08/10] WIP: read stream: Split decision about look ahead for AIO and combining
In-Reply-To: <no-message-id-725183@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox