public inbox for [email protected]  
help / color / mirror / Atom feed
[PATCH v3 08/10] WIP: read stream: Split decision about look ahead for AIO and combining
2+ messages / 1 participants
[nested] [flat]

* [PATCH v3 08/10] WIP: read stream: Split decision about look ahead for AIO and combining
@ 2026-03-30 16:25 Andres Freund <[email protected]>
  0 siblings, 0 replies; 2+ messages in thread

From: Andres Freund @ 2026-03-30 16:25 UTC (permalink / raw)

Previous commits caused a regression due to the this conflation. This is a
first attempt at fixing the problem.  Needs significant reordering and
splitting if it works out.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/aio/read_stream.c | 242 +++++++++++++++++++++-----
 1 file changed, 195 insertions(+), 47 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 144b3613c92..1c375edad1d 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+	/* limit of how far to read ahead for IO combining */
+	int16		combine_distance;
+	/* limit of how far to read ahead for starting IO early */
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -440,6 +455,101 @@ read_stream_start_pending_read(ReadStream *stream)
 	return true;
 }
 
+/*
+ * Should we continue to perform look ahead?  The look ahead may allow us to
+ * make the pending IO larger via IO combining or to issue more read ahead.
+ */
+static bool
+read_stream_should_look_ahead(ReadStream *stream)
+{
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/* If the callback has signaled end-of-stream, we're done */
+	if (stream->readahead_distance <= 0)
+		return false;
+
+	/* never pin more buffers than allowed */
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
+		return false;
+
+	/*
+	 * Allow looking further ahead if we have an the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we are reaching the
+	 * readahead limit (but not the buffer pin limit).
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
+	/*
+	 * Don't start more readahead if that'd put us over the limit for doing
+	 * readahead.
+	 */
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
+		return false;
+
+	return true;
+}
+
+
+/*
+ * We don't start the pending read just because we've hit the distance limit,
+ * preferring to give it another chance to grow to full io_combine_limit size
+ * once more buffers have been consumed.  But this is not desirable in all
+ * situations - see below.
+ */
+static bool
+read_stream_should_issue_now(ReadStream *stream)
+{
+	int16		pending_read_nblocks = stream->pending_read_nblocks;
+
+	/* no IO to issue */
+	if (pending_read_nblocks == 0)
+		return false;
+
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/*
+	 * If the callback has signaled end-of-stream, start the read
+	 * immediately. There's no deferring it for later.
+	 */
+	if (stream->readahead_distance <= 0)
+		return true;
+
+	/*
+	 * If we've already reached io_combine_limit, there's no chance of growing
+	 * the read further.
+	 */
+	if (pending_read_nblocks >= stream->io_combine_limit)
+		return true;
+
+	/* same if capped not by io_combine_limit but combine_distance */
+	if (stream->combine_distance > 0 &&
+		pending_read_nblocks >= stream->combine_distance)
+		return true;
+
+	/*
+	 * If we currently have no reads in flight or prepared, issue the IO once
+	 * we stopped looking ahead. This ensures there's always at least one IO
+	 * prepared.
+	 */
+	if (stream->pinned_buffers == 0 &&
+		!read_stream_should_look_ahead(stream))
+		return true;
+
+	return false;
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream)
 {
@@ -452,14 +562,13 @@ read_stream_look_ahead(ReadStream *stream)
 	if (stream->batch_mode)
 		pgaio_enter_batchmode();
 
-	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+	while (read_stream_should_look_ahead(stream))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
 		void	   *per_buffer_data;
 
-		if (stream->pending_read_nblocks == stream->io_combine_limit)
+		if (read_stream_should_issue_now(stream))
 		{
 			read_stream_start_pending_read(stream);
 			continue;
@@ -479,7 +588,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -511,21 +621,13 @@ read_stream_look_ahead(ReadStream *stream)
 	}
 
 	/*
-	 * We don't start the pending read just because we've hit the distance
-	 * limit, preferring to give it another chance to grow to full
-	 * io_combine_limit size once more buffers have been consumed.  However,
-	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.  Note that the
-	 * pending read can exceed the distance goal, if the latter was reduced
-	 * after hitting the per-backend buffer limit.
+	 * Check if the pending read should be issued now, or if we should give it
+	 * another chance to grow to the full size.
+	 *
+	 * Note that the pending read can exceed the distance goal, if the latter
+	 * was reduced after hitting the per-backend buffer limit.
 	 */
-	if (stream->pending_read_nblocks > 0 &&
-		(stream->pending_read_nblocks == stream->io_combine_limit ||
-		 (stream->pending_read_nblocks >= stream->distance &&
-		  stream->pinned_buffers == 0) ||
-		 stream->distance <= 0) &&
-		stream->ios_in_progress < stream->max_ios)
+	if (read_stream_should_issue_now(stream))
 		read_stream_start_pending_read(stream);
 
 	/*
@@ -534,7 +636,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance <= 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -724,10 +826,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = stream->io_combine_limit;
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -826,7 +935,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -900,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -916,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance <= 0)
+		if (stream->readahead_distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -930,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance <= 0);
+			Assert(stream->readahead_distance <= 0);
 			return InvalidBuffer;
 		}
 	}
@@ -951,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
 		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
@@ -962,7 +1070,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * If the stream has been reset, don't even wait for the IO, just
 		 * discard it.
 		 */
-		if (stream->distance < 0)
+		if (stream->readahead_distance < 0)
 		{
 			if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) &&
 				!stream->ios[io_index].op.foreign_io)
@@ -1011,11 +1119,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * the stream, as stream->distance == 0 is used to keep track of
 		 * having reached the end.
 		 */
-		if (stream->distance > 0 && needed_wait)
+		if (stream->readahead_distance > 0 && needed_wait)
 		{
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+			/* wider temporary value, due to oveflow risk */
+			int32		readahead_distance;
+
+			readahead_distance = stream->readahead_distance * 2;
+			readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+			stream->readahead_distance = readahead_distance;
+		}
+
+		/*
+		 * Whether we needed to wait or not, allow for more IO combining if we
+		 * needed to do IO. The reason to do so independent of needing to wait
+		 * is that when the data is resident in the kernel page cache, IO
+		 * combining reduces the syscall / dispatch overhead, making it
+		 * worthwhile regardless of needing to wait.
+		 *
+		 * It is also important with io_uring as it will never signal the need
+		 * to wait for reads if all the data is in the page cache. There are
+		 * heuristics to deal with that in method_io_uring.c, but they only
+		 * work when the IO gets large enough.
+		 */
+		if (stream->combine_distance > 0 &&
+			stream->combine_distance < stream->io_combine_limit)
+		{
+			/* wider temporary value, due to oveflow risk */
+			int32		combine_distance;
+
+			combine_distance = stream->combine_distance * 2;
+			combine_distance = Min(combine_distance, stream->io_combine_limit);
+			combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+			stream->combine_distance = combine_distance;
 		}
 
 		/*
@@ -1094,10 +1229,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
+	/*
+	 * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+	 * several pre-existing cases where it triggers because we are not issuing
+	 * additional prefetching (e.g. because of a small
+	 * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+	 * after read_stream_look_ahead().
+	 */
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1143,8 +1286,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1156,7 +1300,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1172,7 +1317,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = -1;
+	stream->readahead_distance = -1;
+	stream->combine_distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1204,8 +1350,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb


--biispz4aupeya6ku--





^ permalink  raw  reply  [nested|flat] 2+ messages in thread

* [PATCH v4 06/14] WIP: read stream: Split decision about look ahead for AIO and combining
@ 2026-03-31 16:45 Andres Freund <[email protected]>
  0 siblings, 0 replies; 2+ messages in thread

From: Andres Freund @ 2026-03-31 16:45 UTC (permalink / raw)

"read_stream: Only increase distance when waiting for IO" caused a regression
due to the this conflation, because we ended up not allowing IO combining when
never needing to wait for IO (as the distance ended up too small to allow for
that), which increased CPU overhead.

If we go this way, it probably should be moved ahead of the aforementioned
commit.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/aio/read_stream.c | 147 ++++++++++++++++++++------
 1 file changed, 115 insertions(+), 32 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 5d7bfb8fa00..9d22a119bef 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,14 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+	/* limit of how far to look ahead for IO combining */
+	int16		combine_distance;
+	/* limit of how far to look ahead for starting IO early */
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +336,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +378,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -452,18 +467,33 @@ read_stream_should_look_ahead(ReadStream *stream)
 		return false;
 
 	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return false;
 
 	/* never pin more buffers than allowed */
 	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->max_pinned_buffers)
 		return false;
 
+	/*
+	 * Allow looking further ahead if we have an the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we are reaching the
+	 * read-ahead limit (but not the buffer pin limit).
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
 	/*
 	 * Don't start more read-ahead if that'd put us over the distance limit
 	 * for doing read-ahead.
 	 */
-	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
 		return false;
 
 	return true;
@@ -492,7 +522,7 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return true;
 
 	/*
@@ -502,6 +532,11 @@ read_stream_should_issue_now(ReadStream *stream)
 	if (pending_read_nblocks >= stream->io_combine_limit)
 		return true;
 
+	/* same if capped not by io_combine_limit but combine_distance */
+	if (stream->combine_distance > 0 &&
+		pending_read_nblocks >= stream->combine_distance)
+		return true;
+
 	/*
 	 * If we currently have no reads in flight or prepared, issue the IO once
 	 * we are not looking ahead further. This ensures there's always at least
@@ -552,7 +587,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -599,7 +635,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -789,10 +825,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = stream->io_combine_limit;
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -891,7 +934,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
+		Assert(stream->combine_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -965,7 +1009,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -981,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance == 0)
+		if (stream->readahead_distance == 0)
 			return InvalidBuffer;
 
 		/*
@@ -995,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance == 0);
+			Assert(stream->readahead_distance == 0);
 			return InvalidBuffer;
 		}
 	}
@@ -1016,7 +1060,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
 		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
@@ -1056,11 +1099,38 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 * the stream, as stream->distance == 0 is used to keep track of
 		 * having reached the end.
 		 */
-		if (stream->distance > 0 && needed_wait)
+		if (stream->readahead_distance > 0 && needed_wait)
 		{
-			distance = stream->distance * 2;
-			distance = Min(distance, stream->max_pinned_buffers);
-			stream->distance = distance;
+			/* wider temporary value, due to oveflow risk */
+			int32		readahead_distance;
+
+			readahead_distance = stream->readahead_distance * 2;
+			readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+			stream->readahead_distance = readahead_distance;
+		}
+
+		/*
+		 * Whether we needed to wait or not, allow for more IO combining if we
+		 * needed to do IO. The reason to do so independent of needing to wait
+		 * is that when the data is resident in the kernel page cache, IO
+		 * combining reduces the syscall / dispatch overhead, making it
+		 * worthwhile regardless of needing to wait.
+		 *
+		 * It is also important with io_uring as it will never signal the need
+		 * to wait for reads if all the data is in the page cache. There are
+		 * heuristics to deal with that in method_io_uring.c, but they only
+		 * work when the IO gets large enough.
+		 */
+		if (stream->combine_distance > 0 &&
+			stream->combine_distance < stream->io_combine_limit)
+		{
+			/* wider temporary value, due to oveflow risk */
+			int32		combine_distance;
+
+			combine_distance = stream->combine_distance * 2;
+			combine_distance = Min(combine_distance, stream->io_combine_limit);
+			combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+			stream->combine_distance = combine_distance;
 		}
 
 		/*
@@ -1139,10 +1209,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
+	/*
+	 * FIXME: It's way too easy to wrongly fast path. I'm pretty sure there's
+	 * several pre-existing cases where it triggers because we are not issuing
+	 * additional prefetching (e.g. because of a small
+	 * effective_io_concurrency) and thus stream->pinned_buffers stays at 1
+	 * after read_stream_look_ahead().
+	 */
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1188,8 +1266,9 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1201,7 +1280,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1217,7 +1297,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = 0;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1249,8 +1330,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb


--ik3l34cft6pr4f53
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
	filename="v4-0007-Hacky-implementation-of-making-read_stream_reset-.patch"



^ permalink  raw  reply  [nested|flat] 2+ messages in thread


end of thread, other threads:[~2026-03-31 16:45 UTC | newest]

Thread overview: 2+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-03-30 16:25 [PATCH v3 08/10] WIP: read stream: Split decision about look ahead for AIO and combining Andres Freund <[email protected]>
2026-03-31 16:45 [PATCH v4 06/14] WIP: read stream: Split decision about look ahead for AIO and combining Andres Freund <[email protected]>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox