public inbox for [email protected]  
help / color / mirror / Atom feed
[PATCH v3 07/10] Hacky implementation of making read_stream_reset()/end() not wait for IO
2+ messages / 1 participants
[nested] [flat]

* [PATCH v3 07/10] Hacky implementation of making read_stream_reset()/end() not wait for IO
@ 2026-03-20 03:06 Andres Freund <[email protected]>
  0 siblings, 0 replies; 2+ messages in thread

From: Andres Freund @ 2026-03-20 03:06 UTC (permalink / raw)

Not waiting for IO during read_stream_reset() can be important for performance
in cases where read streams are frequently reset before the end is
reached. Current users do not commonly do that, but the upcoming work to use a
read stream to prefetch table blocks as part of index scans can do so
frequently in some query patterns. E.g. if there is an index scan on the inner
side of a nested loop.

FIXME: This implementation is problematic though, as there is nothing forcing
the discarded IOs to ever be completed if the backend goes idle. That's at the
very least problematic because it leads to the underlying buffers continuing
to be pinned and the IOs showing up in the pg_aios view.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/aio.h             |  1 +
 src/backend/storage/aio/aio.c         | 27 ++++++++++++++++++++++
 src/backend/storage/aio/read_stream.c | 32 ++++++++++++++++++++++-----
 3 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index ec543b78409..c184e97a977 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -328,6 +328,7 @@ extern int	pgaio_wref_get_id(PgAioWaitRef *iow);
 extern void pgaio_wref_wait(PgAioWaitRef *iow);
 extern bool pgaio_wref_check_done(PgAioWaitRef *iow);
 
+extern void pgaio_wref_discard_result(PgAioWaitRef *iow);
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 4e742038d02..49eb677ad06 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -1055,6 +1055,33 @@ pgaio_wref_check_done(PgAioWaitRef *iow)
 	return false;
 }
 
+void
+pgaio_wref_discard_result(PgAioWaitRef *iow)
+{
+	uint64		ref_generation;
+	bool		am_owner;
+	PgAioHandle *ioh;
+	PgAioHandleState state;
+
+	ioh = pgaio_io_from_wref(iow, &ref_generation);
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+	if (!am_owner)
+		elog(ERROR, "not you");
+
+	if (pgaio_io_was_recycled(ioh, ref_generation, &state))
+		return;
+
+	pgaio_debug_io(DEBUG2, ioh,
+				   "discarding result %p",
+				   ioh->report_return);
+
+	if (ioh->resowner)
+		pgaio_io_release_resowner(&ioh->resowner_node, false);
+}
+
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 49971833ddc..144b3613c92 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -524,7 +524,7 @@ read_stream_look_ahead(ReadStream *stream)
 		(stream->pending_read_nblocks == stream->io_combine_limit ||
 		 (stream->pending_read_nblocks >= stream->distance &&
 		  stream->pinned_buffers == 0) ||
-		 stream->distance == 0) &&
+		 stream->distance <= 0) &&
 		stream->ios_in_progress < stream->max_ios)
 		read_stream_start_pending_read(stream);
 
@@ -534,7 +534,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -916,7 +916,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance == 0)
+		if (stream->distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -930,7 +930,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance == 0);
+			Assert(stream->distance <= 0);
 			return InvalidBuffer;
 		}
 	}
@@ -958,7 +958,27 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		/*
+		 * If the stream has been reset, don't even wait for the IO, just
+		 * discard it.
+		 */
+		if (stream->distance < 0)
+		{
+			if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) &&
+				!stream->ios[io_index].op.foreign_io)
+			{
+				pgaio_wref_discard_result(&stream->ios[io_index].op.io_wref);
+				pgaio_wref_clear(&stream->ios[io_index].op.io_wref);
+			}
+			else
+				WaitReadBuffers(&stream->ios[io_index].op);
+
+			needed_wait = false;
+		}
+		else
+		{
+			needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		}
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -1152,7 +1172,7 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = 0;
+	stream->distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
-- 
2.53.0.1.gb2826b52eb


--biispz4aupeya6ku
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
	filename="v3-0008-WIP-read-stream-Split-decision-about-look-ahead-f.patch"



^ permalink  raw  reply  [nested|flat] 2+ messages in thread

* [PATCH v4 07/14] Hacky implementation of making read_stream_reset()/end() not wait for IO
@ 2026-03-31 17:27 Andres Freund <[email protected]>
  0 siblings, 0 replies; 2+ messages in thread

From: Andres Freund @ 2026-03-31 17:27 UTC (permalink / raw)

Not waiting for IO during read_stream_reset() can be important for performance
in cases where read streams are frequently reset before the end is
reached. Current users do not commonly do that, but the upcoming work to use a
read stream to prefetch table blocks as part of index scans can do so
frequently in some query patterns. E.g. if there is an index scan on the inner
side of a nested loop.

FIXME: This implementation is problematic though, as there is nothing forcing
the discarded IOs to ever be completed if the backend goes idle. That's at the
very least problematic because it leads to the underlying buffers continuing
to be pinned and the IOs showing up in the pg_aios view.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/aio.h             |  1 +
 src/backend/storage/aio/aio.c         | 26 +++++++++++++++++++
 src/backend/storage/aio/read_stream.c | 36 +++++++++++++++++++++------
 3 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index ec543b78409..c184e97a977 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -328,6 +328,7 @@ extern int	pgaio_wref_get_id(PgAioWaitRef *iow);
 extern void pgaio_wref_wait(PgAioWaitRef *iow);
 extern bool pgaio_wref_check_done(PgAioWaitRef *iow);
 
+extern void pgaio_wref_discard_result(PgAioWaitRef *iow);
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 8f7e26607b9..438d5911126 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -1050,6 +1050,32 @@ pgaio_wref_check_done(PgAioWaitRef *iow)
 	return false;
 }
 
+void
+pgaio_wref_discard_result(PgAioWaitRef *iow)
+{
+	uint64		ref_generation;
+	bool		am_owner;
+	PgAioHandle *ioh;
+	PgAioHandleState state;
+
+	ioh = pgaio_io_from_wref(iow, &ref_generation);
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+	if (!am_owner)
+		elog(ERROR, "not you");
+
+	if (pgaio_io_was_recycled(ioh, ref_generation, &state))
+		return;
+
+	pgaio_debug_io(DEBUG2, ioh,
+				   "discarding result %p",
+				   ioh->report_return);
+
+	if (ioh->resowner)
+		pgaio_io_release_resowner(&ioh->resowner_node, false);
+}
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 9d22a119bef..397e6576689 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -467,7 +467,7 @@ read_stream_should_look_ahead(ReadStream *stream)
 		return false;
 
 	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->readahead_distance == 0)
+	if (stream->readahead_distance <= 0)
 		return false;
 
 	/* never pin more buffers than allowed */
@@ -522,7 +522,7 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->readahead_distance == 0)
+	if (stream->readahead_distance <= 0)
 		return true;
 
 	/*
@@ -635,7 +635,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -1025,7 +1025,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->readahead_distance == 0)
+		if (stream->readahead_distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -1039,7 +1039,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->readahead_distance == 0);
+			Assert(stream->readahead_distance <= 0);
 			return InvalidBuffer;
 		}
 	}
@@ -1066,7 +1066,27 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		/*
+		 * If the stream has been reset, don't even wait for the IO, just
+		 * discard it.
+		 */
+		if (stream->readahead_distance < 0)
+		{
+			if (pgaio_wref_valid(&stream->ios[io_index].op.io_wref) &&
+				!stream->ios[io_index].op.foreign_io)
+			{
+				pgaio_wref_discard_result(&stream->ios[io_index].op.io_wref);
+				pgaio_wref_clear(&stream->ios[io_index].op.io_wref);
+			}
+			else
+				WaitReadBuffers(&stream->ios[io_index].op);
+
+			needed_wait = false;
+		}
+		else
+		{
+			needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		}
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -1297,8 +1317,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->readahead_distance = 0;
-	stream->combine_distance = 0;
+	stream->readahead_distance = -1;
+	stream->combine_distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
-- 
2.53.0.1.gb2826b52eb


--ik3l34cft6pr4f53--





^ permalink  raw  reply  [nested|flat] 2+ messages in thread


end of thread, other threads:[~2026-03-31 17:27 UTC | newest]

Thread overview: 2+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-03-20 03:06 [PATCH v3 07/10] Hacky implementation of making read_stream_reset()/end() not wait for IO Andres Freund <[email protected]>
2026-03-31 17:27 [PATCH v4 07/14] Hacky implementation of making read_stream_reset()/end() not wait for IO Andres Freund <[email protected]>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox