public inbox for [email protected]
help / color / mirror / Atom feedFrom: Melanie Plageman <[email protected]>
To: Thomas Munro <[email protected]>
Cc: Peter Geoghegan <[email protected]>
Cc: Nazir Bilal Yavuz <[email protected]>
Cc: Jonathan S. Katz <[email protected]>
Cc: pgsql-hackers <[email protected]>
Cc: Andres Freund <[email protected]>
Subject: Re: Trying out read streams in pgvector (an extension)
Date: Wed, 8 Apr 2026 14:06:04 -0400
Message-ID: <CAAKRu_aimYfrOJ1G=6sja=5n-9t7j8JHqX07HUJxfMJqo059jQ@mail.gmail.com> (raw)
In-Reply-To: <CAAKRu_ZHvQ=ZFJPzEMS-iPnN_yFmhy1YvpZMRxoDE-eRt6T4-A@mail.gmail.com>
References: <CA+hUKGJ_7NKd46nx1wbyXWriuZSNzsTfm+rhEuvU6nxZi3-KVw@mail.gmail.com>
<[email protected]>
<CA+hUKG+x2BcqWzBC77cN0ewhzMF0kYhC6c4G_T2gJLPbqYQ6Ow@mail.gmail.com>
<CA+hUKGL-3mBtkA9RTbLFHuSS5cviuv0ko7nBhCg9KM7Q-GSEkw@mail.gmail.com>
<CAAKRu_ZVxzwRRbxedgb_LtkFaGf78XAbTO9uExvadV2DzaE=Jg@mail.gmail.com>
<CA+hUKG+zLmkD9zus=JOjjC+j5p9R1+CSXNZgd5=exZ01ZTaKoA@mail.gmail.com>
<CA+hUKGJx6FNqzsxfSOGH0nJZJq1MBc+t7NBKtAmy6zj4HD86tA@mail.gmail.com>
<CAN55FZ16TEhgYbK=qSEbkO8utz+u232NksCEmJMC1G4iZvnbvA@mail.gmail.com>
<CA+hUKGL7-Dx8KiUo=G91Y5tfFpwDUFFQJ6=9D8Gr1n=DZxGh+w@mail.gmail.com>
<CAAKRu_ZGhnWZXOyEyZ2r47g-F7U8asMRA6U8YZw3h=2rR=m_hQ@mail.gmail.com>
<CAN55FZ0tgjF1beJSRXw3rgkbzwPZ7ngChJkPZm9aJkPuaF=dmg@mail.gmail.com>
<CAAKRu_Zwj83zCJhahhMO578-+JdfTbqMV_ktxr-XjiE8BHLo9g@mail.gmail.com>
<CA+hUKGJLT2JvWLEiBXMbkSSc5so_Y7=N+S2ce7npjLw8QL3d5w@mail.gmail.com>
<CAAKRu_ZHvQ=ZFJPzEMS-iPnN_yFmhy1YvpZMRxoDE-eRt6T4-A@mail.gmail.com>
On Tue, Dec 9, 2025 at 4:42 PM Melanie Plageman
<[email protected]> wrote:
>
> > 1. read_stream_resume() as before, but with a new explicit
> > read_stream_pause(): if a block number callback would like to report a
> > temporary lack of information, it should return
> > read_stream_pause(stream), not InvalidBlockNumber. Then after
> > read_stream_resume(stream) is called, the next
> > read_stream_next_buffer() enters the lookahead loop again. While
> > paused, if the consumer drains all the existing buffers in the stream
> > and then one more, it will receive InvalidBuffer, but if the _resume()
> > call is made sooner, the consumer won't ever know about the temporary
> > lack of buffers in the stream.
I ended up committing read_stream_resume() in 38229cb905165fe but
without the tests because 1f6f200cab67e6, which added other read
stream tests, was imminent. I'd like to add the read_stream_resume()
test back now -- especially because we didn't end up adding another
user of read_stream_resume() in this release.
Attached 0001 is the test Thomas wrote ported over to be in the new
0004_read_stream.pl. It uses asserts instead of comparing output of
the SQL function to expected output, so I included a potential
alternative version of it in 0002 that uses that pattern. Note that
0002 is a diff from 0001, not an independent alternative patch. I
think the test needs more work either way, but I wanted to get the
ball rolling.
- Melanie
Attachments:
[text/x-patch] 0001-Add-test-for-read_stream_resume.patch (4.4K, 2-0001-Add-test-for-read_stream_resume.patch)
download | inline diff:
From 36668cc39dd8a6e249ece3ed53201cb3ea17073e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:40:16 -0400
Subject: [PATCH 1/2] Add test for read_stream_resume()
---
.../modules/test_aio/t/004_read_stream.pl | 22 ++++++
src/test/modules/test_aio/test_aio--1.0.sql | 9 +++
src/test/modules/test_aio/test_aio.c | 71 +++++++++++++++++++
src/tools/pgindent/typedefs.list | 1 +
4 files changed, 103 insertions(+)
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 32311c07ac0..28aedd7d163 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -115,6 +115,27 @@ sub test_repeated_blocks
}
+sub test_read_stream_resume
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ $psql->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+ ok(1, "$io_method: read_stream_resume");
+
+ $psql->quit();
+}
+
+
sub test_inject_foreign
{
my $io_method = shift;
@@ -268,6 +289,7 @@ sub test_io_method
$io_method, "$io_method: io_method set correctly");
test_repeated_blocks($io_method, $node);
+ test_read_stream_resume($io_method, $node);
SKIP:
{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 762ac29512f..48caad25bda 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -128,3 +128,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 35efba1a5e3..9d2d32ad1a2 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -916,6 +916,77 @@ read_stream_for_blocks(PG_FUNCTION_ARGS)
}
+typedef struct
+{
+ BlockNumber blkno;
+ int count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ test_read_stream_resume_state *state = callback_private_data;
+
+ /* Periodic end-of-stream. */
+ if (++state->count % 3 == 0)
+ return read_stream_pause(stream);
+
+ return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+ Buffer buf;
+ ReadStream *stream;
+ test_read_stream_resume_state state = {.blkno = blkno};
+
+ rel = relation_open(relid, AccessShareLock);
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ test_read_stream_resume_cb,
+ &state,
+ 0);
+
+ for (int i = 0; i < 3; ++i)
+ {
+ /* Same block twice. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+
+ /* End-of-stream. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+
+ /* Resume. */
+ read_stream_resume(stream);
+ }
+
+ read_stream_end(stream);
+ relation_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ea95e7984bc..4236afefa2d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4329,6 +4329,7 @@ teSection
temp_tablespaces_extra
test128
test_re_flags
+test_read_stream_resume_state
test_regex_ctx
test_shm_mq_header
test_spec
--
2.43.0
[text/x-patch] 0002-alternative-approach-for-test-read_stream_resume.patch (5.0K, 3-0002-alternative-approach-for-test-read_stream_resume.patch)
download | inline diff:
From 8e8b0a3d967c1e9ae1be64257616d3a02f6f64b4 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:55:05 -0400
Subject: [PATCH 2/2] alternative approach for test read_stream_resume
---
.../modules/test_aio/t/004_read_stream.pl | 21 +++++----
src/test/modules/test_aio/test_aio--1.0.sql | 4 +-
src/test/modules/test_aio/test_aio.c | 45 +++++++++++++++----
3 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 28aedd7d163..68d3be452e2 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -122,15 +122,18 @@ sub test_read_stream_resume
my $psql = $node->background_psql('postgres', on_error_stop => 0);
- $psql->query_safe(
- qq(
-CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
-INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
-SELECT test_read_stream_resume('tmp_read_stream', 0);
-DROP TABLE tmp_read_stream;
-));
-
- ok(1, "$io_method: read_stream_resume");
+ # The callback returns block 0 twice then pauses. We resume 3 times.
+ # -1 means read_stream_next_buffer() returned InvalidBuffer (paused).
+ my @one_cycle = (0, 0, -1);
+ my $expected = '{' . join(',', @one_cycle, @one_cycle, @one_cycle) . '}';
+
+ my $result = $psql->query_safe(
+ qq(SELECT array_agg(blocknum ORDER BY call_index)
+ FROM test_read_stream_resume('largeish', 0);));
+ chomp($result);
+
+ is($result, $expected,
+ "$io_method: read_stream_resume");
$psql->quit();
}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 48caad25bda..d3c941d5221 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -134,6 +134,6 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Read stream related functions
*/
-CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4, OUT call_index int4, OUT blocknum int4)
+RETURNS SETOF record STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 9d2d32ad1a2..abdfa1cb448 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -939,6 +939,11 @@ test_read_stream_resume_cb(ReadStream *stream,
/*
* Test read_stream_resume(), allowing a stream to end temporarily and then
* continue where it left off.
+ *
+ * Returns a result set of (call_index int4, blocknum int4) rows so that the
+ * caller can validate the exact sequence. A blocknum of -1 indicates that
+ * read_stream_next_buffer() returned InvalidBuffer (i.e. the stream was
+ * paused).
*/
PG_FUNCTION_INFO_V1(test_read_stream_resume);
Datum
@@ -946,10 +951,14 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
Relation rel;
Buffer buf;
ReadStream *stream;
test_read_stream_resume_state state = {.blkno = blkno};
+ int call_index = 0;
+
+ InitMaterializedSRF(fcinfo, 0);
rel = relation_open(relid, AccessShareLock);
stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
@@ -962,19 +971,37 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
for (int i = 0; i < 3; ++i)
{
+ Datum values[2] = {0};
+ bool nulls[2] = {0};
+
/* Same block twice. */
buf = read_stream_next_buffer(stream, NULL);
- Assert(BufferGetBlockNumber(buf) == blkno);
- ReleaseBuffer(buf);
- buf = read_stream_next_buffer(stream, NULL);
- Assert(BufferGetBlockNumber(buf) == blkno);
- ReleaseBuffer(buf);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
- /* End-of-stream. */
buf = read_stream_next_buffer(stream, NULL);
- Assert(buf == InvalidBuffer);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
+
+ /* End-of-stream (paused). */
buf = read_stream_next_buffer(stream, NULL);
- Assert(buf == InvalidBuffer);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
/* Resume. */
read_stream_resume(stream);
@@ -983,7 +1010,7 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
read_stream_end(stream);
relation_close(rel, NoLock);
- PG_RETURN_VOID();
+ return (Datum) 0;
}
--
2.43.0
view thread (18+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Trying out read streams in pgvector (an extension)
In-Reply-To: <CAAKRu_aimYfrOJ1G=6sja=5n-9t7j8JHqX07HUJxfMJqo059jQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox