From e8e43a8eca48f317e7c76cc6cbf1470660dd0a31 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Tue, 13 Jan 2026 20:44:14 +0100 Subject: [PATCH v9 04/10] Introduce read_stream_{pause,resume,yield}(). Author: Thomas Munro Discussion: https://postgr.es/m/CA%2BhUKGJLT2JvWLEiBXMbkSSc5so_Y7%3DN%2BS2ce7npjLw8QL3d5w%40mail.gmail.com --- src/include/storage/read_stream.h | 3 + src/backend/storage/aio/read_stream.c | 50 ++++- src/test/modules/test_aio/Makefile | 3 +- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 30 +++ src/test/modules/test_aio/test_aio--1.0.sql | 13 ++ src/test/modules/test_aio/test_read_stream.c | 181 +++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 8 files changed, 280 insertions(+), 2 deletions(-) create mode 100644 src/test/modules/test_aio/test_read_stream.c diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index f2a0cc79c..e3b6bb2f3 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags, ReadStreamBlockNumberCB callback, void *callback_private_data, size_t per_buffer_data_size); +extern BlockNumber read_stream_pause(ReadStream *stream); +extern void read_stream_resume(ReadStream *stream); +extern BlockNumber read_stream_yield(ReadStream *stream); extern void read_stream_reset(ReadStream *stream); extern void read_stream_end(ReadStream *stream); diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 88717c2ff..0dbec2abb 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -100,11 +100,13 @@ struct ReadStream int16 pinned_buffers; int16 distance; int16 initialized_buffers; + int16 resume_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ bool advice_enabled; bool temporary; + bool yielded; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->distance == 0) - return InvalidBuffer; + { + if (!stream->yielded) + return InvalidBuffer; + + /* The callback yielded. Resume. */ + stream->yielded = false; + read_stream_resume(stream); + Assert(stream->distance != 0); + } /* * The usual order of operations is that we look ahead at the bottom @@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) return read_stream_get_block(stream, NULL); } +/* + * Temporarily stop consuming block numbers from the block number callback. If + * called inside the block number callback, its return value should be + * returned by the callback. + */ +BlockNumber +read_stream_pause(ReadStream *stream) +{ + stream->resume_distance = stream->distance; + stream->distance = 0; + return InvalidBlockNumber; +} + +/* + * Resume looking ahead after the block number callback reported end-of-stream. + * This is useful for streams of self-referential blocks, after a buffer needed + * to be consumed and examined to find more block numbers. + */ +void +read_stream_resume(ReadStream *stream) +{ + stream->distance = stream->resume_distance; +} + +/* + * Called from inside a block number callback, to return control to the caller + * of read_stream_next_buffer() without looking further ahead. Its return + * value should be returned by the callback. This is equivalent to pausing and + * resuming automatically at the next call to read_stream_next_buffer(). + */ +BlockNumber +read_stream_yield(ReadStream *stream) +{ + read_stream_pause(stream); + stream->yielded = true; + return InvalidBlockNumber; +} + /* * Reset a read stream by releasing any queued up buffers, allowing the stream * to be used again for different blocks. This can be used to clear an diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile index f53cc6467..465eb09ee 100644 --- a/src/test/modules/test_aio/Makefile +++ b/src/test/modules/test_aio/Makefile @@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO" MODULE_big = test_aio OBJS = \ $(WIN32RES) \ - test_aio.o + test_aio.o \ + test_read_stream.o EXTENSION = test_aio DATA = test_aio--1.0.sql diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index fefa25bc5..d13fda219 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -2,6 +2,7 @@ test_aio_sources = files( 'test_aio.c', + 'test_read_stream.c', ) if host_system == 'windows' diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 5c634ec3c..5af558bf4 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -1489,6 +1489,35 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), $psql->quit(); } +# Read stream tests +sub test_read_stream +{ + my $io_method = shift; + my $node = shift; + my ($ret, $output); + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_resume('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + + $psql->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_read_stream(data int not null); +INSERT INTO tmp_read_stream SELECT generate_series(1, 10000); +SELECT test_read_stream_yield('tmp_read_stream', 0); +DROP TABLE tmp_read_stream; +)); + + $psql->quit(); +} + + # Run all tests that are supported for all io_methods sub test_generic @@ -1525,6 +1554,7 @@ CHECKPOINT; test_checksum($io_method, $node); test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + test_read_stream($io_method, $node); SKIP: { diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c4..e37810b72 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -106,3 +106,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_reopen_detach() RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + + + +/* + * Read stream related functions + */ +CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_read_stream_yield(rel regclass, blockno int4) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c new file mode 100644 index 000000000..d1d436a90 --- /dev/null +++ b/src/test/modules/test_aio/test_read_stream.c @@ -0,0 +1,181 @@ +/*------------------------------------------------------------------------- + * + * test_read_stream.c + * Helpers to write tests for read_stream.c + * + * Copyright (c) 2020-2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_aio/test_read_stream.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "fmgr.h" +#include "storage/bufmgr.h" +#include "storage/read_stream.h" + +typedef struct +{ + BlockNumber blkno; + int count; +} test_read_stream_resume_state; + +static BlockNumber +test_read_stream_resume_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_resume_state *state = callback_private_data; + + /* Periodic end-of-stream. */ + if (++state->count % 3 == 0) + return read_stream_pause(stream); + + return state->blkno; +} + +/* + * Test read_stream_resume(), allowing a stream to end temporarily and then + * continue where it left off. + */ +PG_FUNCTION_INFO_V1(test_read_stream_resume); +Datum +test_read_stream_resume(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_resume_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_resume_cb, + &state, + 0); + + for (int i = 0; i < 3; ++i) + { + /* Same block twice. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + + /* End-of-stream. */ + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + buf = read_stream_next_buffer(stream, NULL); + Assert(buf == InvalidBuffer); + + /* Resume. */ + read_stream_resume(stream); + } + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} + +typedef struct +{ + BlockNumber blkno; + int count; + int yields; + int blocks; +} test_read_stream_yield_state; + +static BlockNumber +test_read_stream_yield_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + test_read_stream_yield_state *state = callback_private_data; + + /* Yield every third call. */ + if (++state->count % 3 == 2) + { + state->yields++; + return read_stream_yield(stream); + } + + state->blocks++; + return state->blkno; +} + +/* + * Test read_stream_yield(), allowing control to be yielded temporarily from + * the lookahead loop and returned to the caller of read_stream_next_buffer(). + */ +PG_FUNCTION_INFO_V1(test_read_stream_yield); +Datum +test_read_stream_yield(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + Buffer buf; + ReadStream *stream; + test_read_stream_yield_state state = {.blkno = blkno}; + + rel = relation_open(relid, AccessShareLock); + stream = read_stream_begin_relation(READ_STREAM_DEFAULT, + NULL, + rel, + MAIN_FORKNUM, + test_read_stream_yield_cb, + &state, + 0); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 1); + Assert(state.yields == 1); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 3); + Assert(state.yields == 1); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 3); + Assert(state.yields == 2); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 5); + Assert(state.yields == 2); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 5); + Assert(state.yields == 3); + + buf = read_stream_next_buffer(stream, NULL); + Assert(BufferGetBlockNumber(buf) == blkno); + ReleaseBuffer(buf); + Assert(state.blocks == 7); + Assert(state.yields == 3); + + read_stream_end(stream); + relation_close(rel, NoLock); + + PG_RETURN_VOID(); +} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3ee86449a..6011be763 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -4200,6 +4200,7 @@ teSection temp_tablespaces_extra test128 test_re_flags +test_read_stream_resume_state test_regex_ctx test_shm_mq_header test_spec -- 2.51.0