public inbox for [email protected]
help / color / mirror / Atom feedRe: Trying out read streams in pgvector (an extension)
18+ messages / 6 participants
[nested] [flat]
* Re: Trying out read streams in pgvector (an extension)
@ 2024-06-11 15:37 Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Jonathan S. Katz @ 2024-06-11 15:37 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; pgsql-hackers
On 6/11/24 12:53 AM, Thomas Munro wrote:
> Hi,
>
> I was looking around for an exotic index type to try the experience of
> streamifying an extension, ie out-of-core code. I am totally new to
> pgvector, but since everyone keeps talking about it, I could not avoid
> picking up some basic facts in the pgconf.dev hallway track, and
> understood that its scans have some degree of known-order access
> predictability, and then also some degree of fuzzy-predictable
> order-not-yet-determined access too. It's also quite random in the
> I/O sense.
Cool! I happened to be chatting w/Andrew about this yesterday to see if
there could be some benefits for folks who are running pgvector on PG17.
> Here's a toy to streamify the known-order part. I think for the fuzzy
> part that links those parts together, maybe there is some way to guess
> when it's a reasonable time to speculatively prefetch the lowest order
> stuff in the pairing heap, and then deal with it if you're wrong, but
> I didn't try that...
I would suggest submitting this at least as a draft PR to the pgvector
project[1]:
https://github.com/pgvector/pgvector
> Someone involved in that project mentioned that it's probably not a
> great topic to research in practice, because real world users of HNSW
> use fully cached ie prewarmed indexes, because the performance is so
> bad otherwise.
I don't think that was me, at least in those words (and I had noted I'd
love to chat w/you about this, but we didn't find time). Stating it
differently, the "ideal" is to keep the indexes in memory, as that leads
to the best performance, but reality is more complicated. These datasets
are quite large (e.g. the 1536-dim vector is a 6KB payload, excluding
what's in the index) and if you're storing the full vector in the index
(there are now some quantization methods available[4]), you can easily
double your dataset size, and quickly exceed available memory. So I
think in the real world, you're more likely to see swapping pages
between disk and memory. Some of this was addressed in the talk @
PGConf.dev[3] (slides here[2]).
> (Though maybe that argument is a little circular...).
> So although this patch clearly speeds up cold HSNW searches to a
> degree controlled by effective_io_concurrency, I'll probably look for
> something else. Suggestions for interesting index types to look at
> streamifying are very welcome!
Yup, so this makes sense for HNSW particularly at the higher-level
pages. But it may make more sense for IVFFlat, given how it clusters
data. With IVFFlat, you first find your lists/centers, and then you
determine how you index each vector around the lists. When those lists
are stored to disk, they're basically sequential. A lot of the struggles
with IVFFlat is both the long load from disk and ultimately some
comptuational issues for a larger set of vector comparisons (though if
you're able to build small, efficient clusters, it can be much faster
than HNSW!). HNSW behaves more like a (bear with me) typically
"tree-based" index, where you'll have hot spots at the top, but because
of the nature of vector search, the lower levels tend to be more random
in access.
Regardless, the part where this is interesting (at least to me) is that
a lot of these vectors tend to take up a full page anyway, so anything
we can do to read them faster from disk will generally get a thumbs up
from me.
> Hmm. If that's really true about HNSW though, then there may still be
> an opportunity to do automatic memory prefetching[1]. But then in the
> case of index building, "stream" is NULL in this patch anyway. It
> surely must also be possible to find some good places to put
> profitable explicit pg_mem_prefetch() calls given the predictability
> and the need to get only ~60ns ahead for that usage. I didn't look
> into that because I was trying to prove things about read_stream.c,
> not get involved in another project :-D
Well, as alluded to in[2], thinking about how another project uses this
will certainly help, and anything we can do to continue to speed up
vector queries helps PostgreSQL ;) Some of the contributions from folks
who have focused on core have significantly helped pgvector.
> Here ends my science experiment report, which I'm dropping here just
> in case others see useful ideas here. The main thing I learned about
> the read stream API is that it'd be nice to be able to reset the
> stream but preserve the distance (something that came up on the
> streaming sequential scan thread for a different reason), to deal with
> cases where look-ahead opportunities come in bursts but you want a
> longer lived stream than I used here. That is the reason the patch
> creates and destroys temporary streams in a loop; doh. It also
> provides an interesting case study for what speculative random
> look-ahead support might need to look like.
If you're curious, I can fire up some of my more serious benchmarks on
this to do a before/after to see if there's anything interesting. I have
a few large datasets (10s of millions) of larger vectors (1536dim => 6KB
payloads) that could see the net effect here.
> (Make sure you remember to set effective_io_concurrency to an
> interesting number if you want to generate a lot of overlapping
> fadvise calls.)
What would you recommend as an "interesting number?" - particularly
using the data parameters above.
Thanks,
Jonathan
[1] https://github.com/pgvector/pgvector
[2]
https://www.pgevents.ca/events/pgconfdev2024/sessions/session/1/slides/42/pgconfdev-2024-vectors.pdf
[3]
https://www.pgevents.ca/events/pgconfdev2024/schedule/session/1-vectors-how-to-better-support-a-nast...
[4] https://jkatz05.com/post/postgres/pgvector-scalar-binary-quantization/
Attachments:
[application/pgp-signature] OpenPGP_signature.asc (840B, 2-OpenPGP_signature.asc)
download
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
@ 2024-09-06 04:28 ` Thomas Munro <[email protected]>
2024-09-06 22:27 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
0 siblings, 2 replies; 18+ messages in thread
From: Thomas Munro @ 2024-09-06 04:28 UTC (permalink / raw)
To: Jonathan S. Katz <[email protected]>; +Cc: pgsql-hackers
On Wed, Jun 12, 2024 at 3:37 AM Jonathan S. Katz <[email protected]> wrote:
> If you're curious, I can fire up some of my more serious benchmarks on
> this to do a before/after to see if there's anything interesting. I have
> a few large datasets (10s of millions) of larger vectors (1536dim => 6KB
> payloads) that could see the net effect here.
>
> > (Make sure you remember to set effective_io_concurrency to an
> > interesting number if you want to generate a lot of overlapping
> > fadvise calls.)
>
> What would you recommend as an "interesting number?" - particularly
> using the data parameters above.
Hi Jonathan,
Sorry for not replying sooner (ETOOMANYPROJECTS). For HNSW, I think
the maximum useful effective_io_concurrency is bound by the number of
connections per HNSW layer ("m"). Here are some times I measured
using m=16 on two laptops:
| linux (xfs) | macos (apfs)
branch | eic | avg | speedup | stdev | avg | speedup | stdev
--------+-----+--------+---------+--------+--------+---------+--------
master | | 73.959 | 1.0 | 24.168 | 72.290 | 1.0 | 11.851
stream | 0 | 70.117 | 1.1 | 36.699 | 76.289 | 1.0 | 12.742
stream | 1 | 57.983 | 1.3 | 5.845 | 79.969 | 1.2 | 8.308
stream | 2 | 35.629 | 2.1 | 4.088 | 49.198 | 2.0 | 7.686
stream | 3 | 28.477 | 2.6 | 2.607 | 37.540 | 2.5 | 5.272
stream | 4 | 26.493 | 2.8 | 3.691 | 33.014 | 2.7 | 4.444
stream | 5 | 23.711 | 3.1 | 2.435 | 32.622 | 3.0 | 2.270
stream | 6 | 22.885 | 3.2 | 1.908 | 31.254 | 3.2 | 4.170
stream | 7 | 21.910 | 3.4 | 2.153 | 33.669 | 3.3 | 4.616
stream | 8 | 20.741 | 3.6 | 1.594 | 34.182 | 3.5 | 3.819
stream | 9 | 22.471 | 3.3 | 3.094 | 30.690 | 3.2 | 2.677
stream | 10 | 19.895 | 3.7 | 1.695 | 32.631 | 3.6 | 4.976
stream | 11 | 19.447 | 3.8 | 1.647 | 31.163 | 3.7 | 3.351
stream | 12 | 18.658 | 4.0 | 1.503 | 30.817 | 3.9 | 3.538
stream | 13 | 18.886 | 3.9 | 0.874 | 29.184 | 3.8 | 4.832
stream | 14 | 18.667 | 4.0 | 1.692 | 28.783 | 3.9 | 3.459
stream | 15 | 19.080 | 3.9 | 1.429 | 28.928 | 3.8 | 3.396
stream | 16 | 18.929 | 3.9 | 3.469 | 29.282 | 3.8 | 2.868
Those are millisecond times to run the test() function shown earlier,
with empty kernel cache and PostgreSQL cache (see below) for maximum
physical I/O. I ran the master test 30 times, and each
effective_io_concurrency level 10 times, to show that the variance
decreases even at the default effective_io_concurency = 1, so we're
not only talking about the avg speed improving.
The all-cached performance also seems to improve, ~8.9ms -> ~6.9ms on
Linux, but I can't fully explain why that is, maybe just some random
stuff about memory layout run-to-run in my quick and dirty test or
something like that, so I'm not claiming that is significant. It
certainly didn't get slower, anyway.
I think you would get very different numbers on a high latency storage
system (say, non-local cloud storage) and potentially much more
speedup with your large test indexes. Also my 6d random number test
may not be very representative and you may be able to come up with
much better tests.
Here's a new version with a TODO tidied up. I also understood that we
need to tweak the read_stream_reset() function, so that it doesn't
forget its current readhead distance when it hops between HNSW nodes
(which is something that comes up in several other potential uses
cases including another one I am working in in core). Without this
patch for PostgreSQL, it reads 1, 2, 4, 7 blocks (= 16 in total)
before it has to take a break to hop to a new page, and then it start
again at 1. Oops. With this patch, it is less forgetful, and reaches
the full possible I/O concurrency of 16 (or whatever the minimum of
HNSW's m parameter and effective_io_concurrency is for you).
PSA two patches, one for PostgreSQL and one for pgvector.
I am not actively working on this right now. If someone wants to try
to develop it further, please feel free! I haven't looked at IVFFlat
at all.
--- function to let you do SELECT uncache('t_embedding_idx'),
--- which is the opposite of SELECT pg_prewarm('t_embedding_idx')
--- see also "echo 1 | sudo tee /proc/sys/vm/drop_caches" (Linux)
--- "sudo purge" (macOS)
create extension pg_buffercache;
create or replace function uncache(name text) returns bool
begin atomic;
select bool_and(pg_buffercache_evict(bufferid))
from pg_buffercache where relfilenode = name::regclass;
end;
Attachments:
[text/x-patch] 0001-Remember-ReadStream-look-ahead-distance-on-reset.patch (2.4K, 2-0001-Remember-ReadStream-look-ahead-distance-on-reset.patch)
download | inline diff:
From 74b45e6a6387f0b7a0f12060a0d8cb401a85552e Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH] Remember ReadStream look-ahead distance on reset.
Some ReadStream clients need to do more work to make a new stream of
block numbers available. The callback indicates that block numbers have
run out by returning InvalidBlockNumber, but some time later the client
code resets the ReadStream to tell it that the callback now has more
block number, so that it can begin consuming buffers again. For
example, pgvector's HNSW index scan reach a new page, and find a new set
of connections to many other pages that will soon be accesed.
When the stream is reset, it would previously reset its look-ahead
distance to 1, and have to build it back up again if I/O is necessary.
With this patch, it will remember what it had before, which seems like a
better bet. If that's wrong, it will soon ramp down via the usual
algorithm.
Discussion: https://postgr.es/m/CA%2BhUKGJ_7NKd46nx1wbyXWriuZSNzsTfm%2BrhEuvU6nxZi3-KVw%40mail.gmail.com
---
src/backend/storage/aio/read_stream.c | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 7f0e07d9586..5d3e070afae 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
int16 max_pinned_buffers;
int16 pinned_buffers;
int16 distance;
+ int16 reset_distance;
bool advice_enabled;
/*
@@ -335,6 +336,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
+ stream->reset_distance = stream->distance;
stream->distance = 0;
break;
}
@@ -526,6 +528,7 @@ read_stream_begin_impl(int flags,
stream->distance = Min(max_pinned_buffers, io_combine_limit);
else
stream->distance = 1;
+ stream->reset_distance = stream->distance;
/*
* Since we always access the same relation, we can initialize parts of
@@ -822,8 +825,11 @@ read_stream_reset(ReadStream *stream)
Assert(stream->pinned_buffers == 0);
Assert(stream->ios_in_progress == 0);
- /* Start off assuming data is cached. */
- stream->distance = 1;
+ /*
+ * If the callback ran out of blocks temporarily, restore the distance from
+ * before.
+ */
+ stream->distance = Max(stream->reset_distance, 1);
}
/*
--
2.46.0
[text/x-patch] v2-0001-Use-streaming-I-O-for-HNSW-blocks.patch (6.4K, 3-v2-0001-Use-streaming-I-O-for-HNSW-blocks.patch)
download | inline diff:
From 21b404d8a6f400bb6da1bbfca7509a41f2b9f002 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Tue, 11 Jun 2024 14:32:47 +1200
Subject: [PATCH v2] Use streaming I/O for HNSW blocks.
If data is not already in PostgreSQL's cache, it will be accessed using
the new ReadStream API in PostgreSQL 17. We know the next 'm' HNSW
blocks we will access, so the ReadStream can read them into the
kernel's page cache asynchronously, up to the limit of the
effective_io_concurrency setting. While that currently defaults to
only 1, even 1 provides some speedup for cold caches, and higher number
help more.
XXX This is a proof-of-concept
Author: Thomas Munro <[email protected]>
Discussion: https://www.postgresql.org/message-id/flat/CA%2BhUKGJ_7NKd46nx1wbyXWriuZSNzsTfm%2BrhEuvU6nxZi3-KVw%40mail.gmail.com
---
src/hnsw.h | 1 +
src/hnswutils.c | 113 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 114 insertions(+)
diff --git a/src/hnsw.h b/src/hnsw.h
index 480ad9f..e738241 100644
--- a/src/hnsw.h
+++ b/src/hnsw.h
@@ -136,6 +136,7 @@ struct HnswElementData
uint8 deleted;
uint32 hash;
HnswNeighborsPtr neighbors;
+ Buffer buffer;
BlockNumber blkno;
OffsetNumber offno;
OffsetNumber neighborOffno;
diff --git a/src/hnswutils.c b/src/hnswutils.c
index 96c5026..46e76be 100644
--- a/src/hnswutils.c
+++ b/src/hnswutils.c
@@ -14,6 +14,10 @@
#include "utils/memdebug.h"
#include "utils/rel.h"
+#if PG_VERSION_NUM >= 170000
+#include "storage/read_stream.h"
+#endif
+
#if PG_VERSION_NUM >= 130000
#include "common/hashfn.h"
#else
@@ -278,6 +282,9 @@ HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
HnswElement element = palloc(sizeof(HnswElementData));
char *base = NULL;
+#if PG_VERSION_NUM >= 170000
+ element->buffer = InvalidBuffer;
+#endif
element->blkno = blkno;
element->offno = offno;
HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL);
@@ -555,7 +562,20 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index,
HnswElementTuple etup;
/* Read vector */
+#if PG_VERSION_NUM >= 170000
+ if (element->buffer != InvalidBuffer)
+ {
+ /* Buffer pinned already. */
+ buf = element->buffer;
+ Assert(BufferGetBlockNumber(buf) == element->blkno);
+ }
+ else
+ {
+ buf = ReadBuffer(index, element->blkno);
+ }
+#else
buf = ReadBuffer(index, element->blkno);
+#endif
LockBuffer(buf, BUFFER_LOCK_SHARE);
page = BufferGetPage(buf);
@@ -717,6 +737,34 @@ CountElement(char *base, HnswElement skipElement, HnswCandidate * hc)
return e->heaptidsLength != 0;
}
+#if PG_VERSION_NUM >= 170000
+typedef struct HnswSearchLayerNextBlockData {
+ char *base;
+ HnswCandidate **items;
+ int nitems;
+ int i;
+} HnswSearchLayerNextBlockData;
+
+/*
+ * Callback used to feed block numbers to the ReadStream.
+ */
+static BlockNumber
+HnswSearchLayerNextBlock(ReadStream *stream,
+ void *callback_data,
+ void *per_buffer_data)
+{
+ HnswSearchLayerNextBlockData *data = callback_data;
+ HnswElement hce;
+
+ if (data->i == data->nitems)
+ return InvalidBlockNumber;
+
+ hce = HnswPtrAccess(data->base, data->items[data->i++]->element);
+
+ return hce->blkno;
+}
+#endif
+
/*
* Algorithm 2 from paper
*/
@@ -732,6 +780,27 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
HnswNeighborArray *neighborhoodData = NULL;
Size neighborhoodSize = 0;
+#if PG_VERSION_NUM >= 170000
+ HnswSearchLayerNextBlockData stream_callback_data = {0};
+ ReadStream *stream;
+
+ /*
+ * If we're searching an index, create a stream so that we can generate
+ * some I/O asynchronicity when the index is cold, if
+ * effective_io_concurrency is configured.
+ */
+ if (index)
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ index,
+ MAIN_FORKNUM,
+ HnswSearchLayerNextBlock,
+ &stream_callback_data,
+ 0);
+ else
+ stream = NULL;
+#endif
+
InitVisited(base, &v, index, ef, m);
/* Create local memory for neighborhood if needed */
@@ -767,6 +836,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner;
HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
HnswElement cElement;
+ HnswCandidate *items[HNSW_MAX_SIZE];
+ int nitems;
if (c->distance > f->distance)
break;
@@ -788,6 +859,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
neighborhood = neighborhoodData;
}
+ /* Build a list of indexes of neighbors to visit. */
+ nitems = 0;
for (int i = 0; i < neighborhood->length; i++)
{
HnswCandidate *e = &neighborhood->items[i];
@@ -796,6 +869,35 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
AddToVisited(base, &v, e, index, &visited);
if (!visited)
+ items[nitems++] = e;
+ }
+
+#if PG_VERSION_NUM >= 170000
+ if (stream)
+ {
+ /*
+ * Give the callback the information it needs to find future block
+ * numbers.
+ */
+ stream_callback_data.base = base;
+ stream_callback_data.items = items;
+ stream_callback_data.nitems = nitems;
+ stream_callback_data.i = 0;
+
+ /*
+ * Reset the stream. This is necessary because each time the
+ * callback runs out of data, the stream needs to be resetarted
+ * before it tries to look ahead again.
+ */
+ read_stream_reset(stream);
+ }
+#endif
+
+ /* Visit them. */
+ for (int i = 0; i < nitems; i++)
+ {
+ HnswCandidate *e = items[i];
+
{
float eDistance;
HnswElement eElement = HnswPtrAccess(base, e->element);
@@ -806,7 +908,13 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
if (index == NULL)
eDistance = GetCandidateDistance(base, e, q, procinfo, collation);
else
+ {
+#if PG_VERSION_NUM >= 170000
+ if (stream)
+ eElement->buffer = read_stream_next_buffer(stream, NULL);
+#endif
HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting, alwaysAdd ? NULL : &f->distance);
+ }
if (eDistance < f->distance || alwaysAdd)
{
@@ -844,6 +952,11 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
}
}
+#if PG_VERSION_NUM >= 170000
+ if (stream)
+ read_stream_end(stream);
+#endif
+
/* Add each element of W to w */
while (!pairingheap_is_empty(W))
{
--
2.46.0
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2024-09-06 22:27 ` Thomas Munro <[email protected]>
1 sibling, 0 replies; 18+ messages in thread
From: Thomas Munro @ 2024-09-06 22:27 UTC (permalink / raw)
To: Jonathan S. Katz <[email protected]>; +Cc: pgsql-hackers
There was a mistake in my query, so the macOS speedup column was wrong
(was accidentally comparing Linux number with macOS master, sorry for
the noise). I also forgot to mention that you don't actually get the
speedup on PostgreSQL 17 on a Mac, because Peter only recently
implemented the needed read-ahead support for macOS in master/18, but
it doesn't get slower. Here's the corrected table:
| linux (xfs) | macos (apfs)
branch | eic | avg | speedup | stdev | avg | speedup | stdev
--------+-----+--------+---------+--------+--------+---------+--------
master | | 73.959 | 1.0 | 24.168 | 72.290 | 1.0 | 11.851
stream | 0 | 70.117 | 1.1 | 36.699 | 76.289 | 0.9 | 12.742
stream | 1 | 57.983 | 1.3 | 5.845 | 79.969 | 0.9 | 8.308
stream | 2 | 35.629 | 2.1 | 4.088 | 49.198 | 1.5 | 7.686
stream | 3 | 28.477 | 2.6 | 2.607 | 37.540 | 1.9 | 5.272
stream | 4 | 26.493 | 2.8 | 3.691 | 33.014 | 2.2 | 4.444
stream | 5 | 23.711 | 3.1 | 2.435 | 32.622 | 2.2 | 2.270
stream | 6 | 22.885 | 3.2 | 1.908 | 31.254 | 2.3 | 4.170
stream | 7 | 21.910 | 3.4 | 2.153 | 33.669 | 2.1 | 4.616
stream | 8 | 20.741 | 3.6 | 1.594 | 34.182 | 2.1 | 3.819
stream | 9 | 22.471 | 3.3 | 3.094 | 30.690 | 2.4 | 2.677
stream | 10 | 19.895 | 3.7 | 1.695 | 32.631 | 2.2 | 4.976
stream | 11 | 19.447 | 3.8 | 1.647 | 31.163 | 2.3 | 3.351
stream | 12 | 18.658 | 4.0 | 1.503 | 30.817 | 2.3 | 3.538
stream | 13 | 18.886 | 3.9 | 0.874 | 29.184 | 2.5 | 4.832
stream | 14 | 18.667 | 4.0 | 1.692 | 28.783 | 2.5 | 3.459
stream | 15 | 19.080 | 3.9 | 1.429 | 28.928 | 2.5 | 3.396
stream | 16 | 18.929 | 3.9 | 3.469 | 29.282 | 2.5 | 2.868
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-11 21:21 ` Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
1 sibling, 1 reply; 18+ messages in thread
From: Thomas Munro @ 2025-11-11 21:21 UTC (permalink / raw)
To: Jonathan S. Katz <[email protected]>; +Cc: pgsql-hackers
On Fri, Sep 6, 2024 at 4:28 PM Thomas Munro <[email protected]> wrote:
> Here's a new version with a TODO tidied up. I also understood that we
> need to tweak the read_stream_reset() function, so that it doesn't
> forget its current readhead distance when it hops between HNSW nodes
> (which is something that comes up in several other potential uses
> cases including another one I am working in in core). Without this
> patch for PostgreSQL, it reads 1, 2, 4, 7 blocks (= 16 in total)
> before it has to take a break to hop to a new page, and then it start
> again at 1. Oops. With this patch, it is less forgetful, and reaches
> the full possible I/O concurrency of 16 (or whatever the minimum of
> HNSW's m parameter and effective_io_concurrency is for you).
I heard that the pgvector project is now trying to do this for real,
and (surprise!) running into this problem. It causes streamified HNSW
search to regress in performance on some queries when the overheads of
streaming are not outweighed by the (bogusly constrained) gains in
concurrency. We just don't generate enough concurrency to win. I
probably should have been more opinionated and just committed a
version of that distance-reset policy change, but I guess at the time
I wrote the above, streaming and AIO were a little too abstract to
attract reviews relating to hypothetical external projects.
We definitely want to fix that for v19, because it also affects the
streamified index scan project and doubtless many other things. I
wrote about that with patches[1] and will start a new thread soon with
a new collection of rebased heuristics improvements.
But for now, to fix pgvector's woes, I wonder if it might make sense
to call this a bug in v18, and back-patch the tiniest possible change.
Something like what I posted[2] in this thread almost two years ago.
I don't think it really affects any core code: we use
read_stream_reset() only in very minimal ways there (I could
elaborate), and it's quite arguable that the existing policy is wrong
for them too, but we'd need to confirm that and perhaps think about
other extensions that might be using it.
Better ideas?
[1] https://www.postgresql.org/message-id/flat/CA%2BhUKGL6hCd40Dh1AcFcoiw5zJXK2T1dRKO3oe8RkPExqA5zoQ%40m...
[2] https://www.postgresql.org/message-id/flat/CA%2BhUKG%2Bx2BcqWzBC77cN0ewhzMF0kYhC6c4G_T2gJLPbqYQ6Ow%4...
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-11 22:52 ` Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Melanie Plageman @ 2025-11-11 22:52 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Jonathan S. Katz <[email protected]>; pgsql-hackers
On Tue, Nov 11, 2025 at 4:22 PM Thomas Munro <[email protected]> wrote:
>
> But for now, to fix pgvector's woes, I wonder if it might make sense
> to call this a bug in v18, and back-patch the tiniest possible change.
> Something like what I posted[2] in this thread almost two years ago.
> I don't think it really affects any core code: we use
> read_stream_reset() only in very minimal ways there (I could
> elaborate), and it's quite arguable that the existing policy is wrong
> for them too, but we'd need to confirm that and perhaps think about
> other extensions that might be using it.
If we are worried about regressing other extensions using
read_stream_reset(), we could make the read stream reset which
preserves the distance a different function in backbranches.
- Melanie
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
@ 2025-11-11 23:19 ` Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Thomas Munro @ 2025-11-11 23:19 UTC (permalink / raw)
To: Melanie Plageman <[email protected]>; +Cc: Jonathan S. Katz <[email protected]>; pgsql-hackers
On Wed, Nov 12, 2025 at 11:52 AM Melanie Plageman
<[email protected]> wrote:
> On Tue, Nov 11, 2025 at 4:22 PM Thomas Munro <[email protected]> wrote:
> > But for now, to fix pgvector's woes, I wonder if it might make sense
> > to call this a bug in v18, and back-patch the tiniest possible change.
> > Something like what I posted[2] in this thread almost two years ago.
> > I don't think it really affects any core code: we use
> > read_stream_reset() only in very minimal ways there (I could
> > elaborate), and it's quite arguable that the existing policy is wrong
> > for them too, but we'd need to confirm that and perhaps think about
> > other extensions that might be using it.
>
> If we are worried about regressing other extensions using
> read_stream_reset(), we could make the read stream reset which
> preserves the distance a different function in backbranches.
Hmm, yeah, interesting idea. Candidate names might include
read_stream_restart() and read_stream_continue(). The point being
that the block number callback reported end-of-stream, but that was
only temporary, and now it has more information and would like to
continue. Those are some of the names I bounced around for a new
read_stream_reset() flag argument for v19 (I rather liked "continue"),
but I also like this separate function idea. Back-patching a new
function would certainly remove all doubt about unintended
consequences for existing callers of read_stream_reset(), so yeah,
that wins on pure conservative safety grounds. As for the future,
hmm, it might even be better to have an explicit separate API for this
operation in master too, as it is turning out to be quite a common
requirement and the naming is much clearer like that. We don't
usually design new APIs while back-patching though, that's probably
why I didn't think of that, but if we view this as a design bug that
folded too many jobs into read_stream_reset() that we now want to fix
by splitting one off, maybe that's OK? Seems pretty risk-free,
anyway.
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-12 04:11 ` Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 20:44 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
0 siblings, 2 replies; 18+ messages in thread
From: Thomas Munro @ 2025-11-12 04:11 UTC (permalink / raw)
To: Melanie Plageman <[email protected]>; +Cc: Jonathan S. Katz <[email protected]>; pgsql-hackers
On Wed, Nov 12, 2025 at 12:19 PM Thomas Munro <[email protected]> wrote:
> On Wed, Nov 12, 2025 at 11:52 AM Melanie Plageman
> <[email protected]> wrote:
> > If we are worried about regressing other extensions using
> > read_stream_reset(), we could make the read stream reset which
> > preserves the distance a different function in backbranches.
Here is a draft patch like that, that tries to be as small as
possible. Trying out the name read_stream_resume().
Attachments:
[text/x-patch] 0001-Fix-overloaded-remit-of-read_stream_reset.patch (4.5K, 2-0001-Fix-overloaded-remit-of-read_stream_reset.patch)
download | inline diff:
From ab494c6e5ccf93563dcf7059f7eec7d4252294f6 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Fix overloaded remit of read_stream_reset().
Some extensions need to examine page contents to find more block numbers
to stream. That currently means that they have to signal end-of-stream
when data runs out, but later "reset" the stream to resume with new
information.
As an unfortunate by-product of that API economy, the look-ahead
distance would start out at one again, and then need to ramp back up to
find a useful concurrency level. This created a significant loss of
performance for self-referential block streams with a high degree of
fan-out, as discovered by the pgvector project.
Split out a separate function read_stream_resume() that is explicitly
intended for that usage pattern. Since distance == 0 is the internal
representation of end-of-stream, all it has to do is restore the
distance recorded at the time end-of-stream was signaled to resume
looking ahead with the same I/O concurrency level after a page jump.
We don't have any self-referential streaming in PostgreSQL itself yet,
but we decided to call this a bug all the same and back-patch a fix.
The intention was to support that exact usage pattern, the API just
missed the mark by trying to handle too many use cases. Such extensions
should be able to benefit from streaming, and the behavior change is
isolated to code that calls the new variant, so a separation of duties
seems warranted and safe to back-patch.
The problem was reported as extremely poor look-ahead performance in v18
using AIO for prefetching, but it also affects the fadvise-based
prefetching in v17, where read_stream.c arrived.
Reviewed-by:
Tested-by:
Backpatch-through: 17
Discussion: https://postgr.es/m/CA%2BhUKGL-3mBtkA9RTbLFHuSS5cviuv0ko7nBhCg9KM7Q-GSEkw%40mail.gmail.com
---
src/backend/storage/aio/read_stream.c | 15 +++++++++++++++
src/include/storage/read_stream.h | 1 +
2 files changed, 16 insertions(+)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..beae8ef325a 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,6 +100,7 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
int16 initialized_buffers;
+ int16 resume_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
@@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream)
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
+ stream->resume_distance = stream->distance;
stream->distance = 0;
break;
}
@@ -711,6 +713,7 @@ read_stream_begin_impl(int flags,
stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
else
stream->distance = 1;
+ stream->resume_distance = stream->distance;
/*
* Since we always access the same relation, we can initialize parts of
@@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
else
{
/* No more blocks, end of stream. */
+ stream->resume_distance = stream->distance;
stream->distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
@@ -1034,6 +1038,17 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
return read_stream_get_block(stream, NULL);
}
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+ stream->distance = stream->resume_distance;
+}
+
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..e29ac50fc9e 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
+extern void read_stream_resume(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
--
2.51.1
[text/x-patch] 0002-Add-smoke-test-for-read_stream_resume.patch (5.9K, 3-0002-Add-smoke-test-for-read_stream_resume.patch)
download | inline diff:
From a3c0520aa382874c7028c42d7d720415d4b3b26d Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Wed, 12 Nov 2025 16:49:57 +1300
Subject: [PATCH 2/2] Add smoke test for read_stream_resume().
XXX Not sure if it's worth a permanent test for this, but since the new
function is not exercised in the tree it seemed worth writing a simple
demo...
---
src/test/modules/test_aio/Makefile | 3 +-
src/test/modules/test_aio/meson.build | 1 +
src/test/modules/test_aio/t/001_aio.pl | 21 +++++
src/test/modules/test_aio/test_aio--1.0.sql | 9 ++
src/test/modules/test_aio/test_read_stream.c | 89 ++++++++++++++++++++
src/tools/pgindent/typedefs.list | 1 +
6 files changed, 123 insertions(+), 1 deletion(-)
create mode 100644 src/test/modules/test_aio/test_read_stream.c
diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile
index f53cc64671a..465eb09ee4f 100644
--- a/src/test/modules/test_aio/Makefile
+++ b/src/test/modules/test_aio/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO"
MODULE_big = test_aio
OBJS = \
$(WIN32RES) \
- test_aio.o
+ test_aio.o \
+ test_read_stream.o
EXTENSION = test_aio
DATA = test_aio--1.0.sql
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 73d2fd68eaa..6e6fcbfdad9 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -2,6 +2,7 @@
test_aio_sources = files(
'test_aio.c',
+ 'test_read_stream.c',
)
if host_system == 'windows'
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 3f0453619e8..cd1fd633bd8 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1489,6 +1489,26 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
$psql->quit();
}
+# Read stream tests
+sub test_read_stream
+{
+ my $io_method = shift;
+ my $node = shift;
+ my ($ret, $output);
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ $psql->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+ $psql->quit();
+}
+
+
# Run all tests that are supported for all io_methods
sub test_generic
@@ -1525,6 +1545,7 @@ CHECKPOINT;
test_checksum($io_method, $node);
test_ignore_checksum($io_method, $node);
test_checksum_createdb($io_method, $node);
+ test_read_stream($io_method, $node);
SKIP:
{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..dde8de35bc7 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -106,3 +106,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c
new file mode 100644
index 00000000000..bf608db8a72
--- /dev/null
+++ b/src/test/modules/test_aio/test_read_stream.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * test_read_stream.c
+ * Helpers to write tests for read_stream.c
+ *
+ * Copyright (c) 2020-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_aio/test_read_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "fmgr.h"
+#include "storage/bufmgr.h"
+#include "storage/read_stream.h"
+
+typedef struct
+{
+ BlockNumber blkno;
+ int count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ test_read_stream_resume_state *state = callback_private_data;
+
+ /* Periodic end-of-stream. */
+ if (++state->count % 3 == 0)
+ return InvalidBlockNumber;
+
+ return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+ Buffer buf;
+ ReadStream *stream;
+ test_read_stream_resume_state state = {.blkno = blkno};
+
+ rel = relation_open(relid, AccessShareLock);
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ test_read_stream_resume_cb,
+ &state,
+ 0);
+
+ for (int i = 0; i < 3; ++i)
+ {
+ /* Same block twice. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+
+ /* End-of-stream. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+
+ /* Resume. */
+ read_stream_resume(stream);
+ }
+
+ read_stream_end(stream);
+ relation_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 432509277c9..ca4c071ded2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4144,6 +4144,7 @@ td_entry
teSection
temp_tablespaces_extra
test_re_flags
+test_read_stream_resume_state
test_regex_ctx
test_shm_mq_header
test_spec
--
2.51.1
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-12 08:04 ` Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
1 sibling, 1 reply; 18+ messages in thread
From: Nazir Bilal Yavuz @ 2025-11-12 08:04 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Melanie Plageman <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
Hi,
Thank you for working on this!
On Wed, 12 Nov 2025 at 07:12, Thomas Munro <[email protected]> wrote:
>
> On Wed, Nov 12, 2025 at 12:19 PM Thomas Munro <[email protected]> wrote:
> > On Wed, Nov 12, 2025 at 11:52 AM Melanie Plageman
> > <[email protected]> wrote:
> > > If we are worried about regressing other extensions using
> > > read_stream_reset(), we could make the read stream reset which
> > > preserves the distance a different function in backbranches.
>
> Here is a draft patch like that, that tries to be as small as
> possible. Trying out the name read_stream_resume().
I liked the idea of having a different function named
read_stream_resume for this purpose.
0001 looks good to me.
0002:
+ /* End-of-stream. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
I noticed there are two 'read_stream_next_buffer()' and
'InvalidBuffer' checks. Does having both provide any additional
validation? I tried removing one of them, and the test still passed.
Also, there is one thing I wanted to clarify about the
'read_stream_resume()'. If 'read_stream_next_buffer()' returns an
'InvalidBuffer', then we can use 'read_stream_resume()' alone because
we know that we already consumed all buffers in the stream. For the
rest, we need to use 'read_stream_resume()' together with the
'read_stream_reset()', right?
--
Regards,
Nazir Bilal Yavuz
Microsoft
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
@ 2025-11-12 10:47 ` Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Thomas Munro @ 2025-11-12 10:47 UTC (permalink / raw)
To: Nazir Bilal Yavuz <[email protected]>; +Cc: Melanie Plageman <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Wed, Nov 12, 2025 at 9:04 PM Nazir Bilal Yavuz <[email protected]> wrote:
> On Wed, 12 Nov 2025 at 07:12, Thomas Munro <[email protected]> wrote:
> 0002:
>
> + /* End-of-stream. */
> + buf = read_stream_next_buffer(stream, NULL);
> + Assert(buf == InvalidBuffer);
> + buf = read_stream_next_buffer(stream, NULL);
> + Assert(buf == InvalidBuffer);
>
> I noticed there are two 'read_stream_next_buffer()' and
> 'InvalidBuffer' checks. Does having both provide any additional
> validation? I tried removing one of them, and the test still passed.
I wanted to demonstrate that this is a state that the stream is stuck
in until you call _resume().
I suppose an alternative design would be that _next_buffer() returns
InvalidBuffer only once (= the block number callback returns
InvalidBlock once) and then automatically resumes (= it restores the
distance) and then you can call read_stream_next_buffer() again (= the
block number callback will be called again to fill the stream up with
new buffers before waiting for the first one to be ready to give to
you if it isn't already). That would have the advantage of not
requiring a new function at all and make the patch even shorter, but I
don't know, I guess I thought that would be a bit more fragile in some
way, less explicit. Hmm, would it actually be better if it worked
like that?
> Also, there is one thing I wanted to clarify about the
> 'read_stream_resume()'. If 'read_stream_next_buffer()' returns an
> 'InvalidBuffer', then we can use 'read_stream_resume()' alone because
> we know that we already consumed all buffers in the stream. For the
> rest, we need to use 'read_stream_resume()' together with the
> 'read_stream_reset()', right?
For the rest, there would be no need to call read_stream_resume().
To recap the uses of read_stream_reset(): the original purpose was to
release any buffers (pins) that the stream is holding internally
because of look-ahead, and put it back to its original state, ready to
be reused. It is called (1) by read_stream_end() as an implementation
detail (eg if a LIMIT or anything else except ERROR/FATAL ends your
query early, we need to unpin buffers queued in the stream before we
pfree it), (2) explicitly by rescans, (3) in hypothetical code I
thought about that would want to stream blocks speculatively and then
change its mind after predicting incorrectly (I had a few patches like
that, abandoned for now), and then (4) in this case, by places that
temporarily ran out of block numbers, but will have some more again
soon and want to resume the stream.
It was already debatable whether heuristic state like lookahead
distance should survive acoss rescans, or in other words, whether the
expected I/O requirements of the previous scan are a useful prediction
of the requirements of the next scan, but the answer is clearer in
case (4), hence the desire to find a way to separate that use case
from the others.
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-18 21:17 ` Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Melanie Plageman @ 2025-11-18 21:17 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Wed, Nov 12, 2025 at 5:47 AM Thomas Munro <[email protected]> wrote:
>
> I suppose an alternative design would be that _next_buffer() returns
> InvalidBuffer only once (= the block number callback returns
> InvalidBlock once) and then automatically resumes (= it restores the
> distance) and then you can call read_stream_next_buffer() again (= the
> block number callback will be called again to fill the stream up with
> new buffers before waiting for the first one to be ready to give to
> you if it isn't already). That would have the advantage of not
> requiring a new function at all and make the patch even shorter, but I
> don't know, I guess I thought that would be a bit more fragile in some
> way, less explicit. Hmm, would it actually be better if it worked
> like that?
We discussed off-list and decided that changing existing functionality
in an unexpected way is undesirable. So, it is better we stick with
adding read_stream_resume. However, in talking about
read_stream_resume() further, Thomas and I also thought of potential
issues with it:
If read_stream_resume() is called before the read stream user callback
has ever returned InvalidBlockNumber,
1) The value of resume_distance will be the original value of distance
from read_stream_begin_relation(). You don't want to reset the
distance to that value.
2) There may be inflight or completed buffers that have yet to be
yielded which will be returned the next time read_stream_next_buffer()
is invoked. If the user resets the state the callback is using to
return blocks and expects the next invocation of
read_stream_next_buffer() to return buffers with those blocks, they
will be disappointed.
If we try to address this by requiring that stream->distance is 0 when
read_stream_resume() is called, that won't work because while it is
set to 0 when the callback returns InvalidBlockNumber, there may still
be unreturned buffers in the stream.
If the user wants to use read_stream_reset() to exhaust the stream
before calling read_stream_resume(), read_stream_reset() sets
stream->distance to 1 at the end, so read_stream_resume() couldn't
detect if reset() was correctly called first or if the distance is > 0
because the stream is still in progress.
To make sure 1) distance isn't reset to a resume_distance from
read_stream_begin_relation() and 2) unexpected buffers aren't returned
from the read stream, we could error out in read_stream_resume() if
pinned_buffers > 0. And in read_stream_reset(), we would save distance
in resume_distance before clearing distance. That would allow calling
read_stream_resume() either if you called read_stream_reset() or if
you exhausted the stream yourself. See rough attached patch for a
sketch of this.
It would be nicer if we could error out if read_stream_next_buffer()
didn't return InvalidBuffer, but we can't do that if we want to allow
calling read_stream_reset() followed by read_stream_resume() because
distance won't be 0.
I tried this with a modified pgvector with an hnsw read stream user
and it seemed to work correctly.
- Melanie
Attachments:
[text/x-patch] 0001-resume.patch (3.2K, 2-0001-resume.patch)
download | inline diff:
From cf41f56ba0d8c7f3bf92242020f742f201cf08d6 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 18 Nov 2025 14:24:12 -0500
Subject: [PATCH] resume
---
src/backend/storage/aio/read_stream.c | 19 +++++++++++++++++++
src/include/storage/read_stream.h | 1 +
2 files changed, 20 insertions(+)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..75bf92dc683 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,6 +100,7 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
int16 initialized_buffers;
+ int16 resume_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
@@ -464,6 +465,7 @@ read_stream_look_ahead(ReadStream *stream)
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
+ stream->resume_distance = stream->distance;
stream->distance = 0;
break;
}
@@ -711,6 +713,7 @@ read_stream_begin_impl(int flags,
stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
else
stream->distance = 1;
+ stream->resume_distance = stream->distance;
/*
* Since we always access the same relation, we can initialize parts of
@@ -862,6 +865,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
else
{
/* No more blocks, end of stream. */
+ stream->resume_distance = stream->distance;
stream->distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
@@ -1034,6 +1038,19 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
return read_stream_get_block(stream, NULL);
}
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+ if (stream->pinned_buffers > 0)
+ elog(ERROR, "read stream must be exhausted before resuming");
+ stream->distance = stream->resume_distance;
+}
+
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
@@ -1047,6 +1064,8 @@ read_stream_reset(ReadStream *stream)
Buffer buffer;
/* Stop looking ahead. */
+ if (stream->distance > 0)
+ stream->resume_distance = stream->distance;
stream->distance = 0;
/* Forget buffered block number and fast path state. */
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..e29ac50fc9e 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,7 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
+extern void read_stream_resume(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
--
2.47.3
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
@ 2025-11-19 07:27 ` Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Nazir Bilal Yavuz @ 2025-11-19 07:27 UTC (permalink / raw)
To: Melanie Plageman <[email protected]>; +Cc: Thomas Munro <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
Hi,
On Wed, 19 Nov 2025 at 00:17, Melanie Plageman
<[email protected]> wrote:
>
> To make sure 1) distance isn't reset to a resume_distance from
> read_stream_begin_relation() and 2) unexpected buffers aren't returned
> from the read stream, we could error out in read_stream_resume() if
> pinned_buffers > 0. And in read_stream_reset(), we would save distance
> in resume_distance before clearing distance. That would allow calling
> read_stream_resume() either if you called read_stream_reset() or if
> you exhausted the stream yourself. See rough attached patch for a
> sketch of this.
This looks correct to me. What do you think about using an assert
instead of erroring out?
--
Regards,
Nazir Bilal Yavuz
Microsoft
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
@ 2025-11-20 15:27 ` Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
0 siblings, 1 reply; 18+ messages in thread
From: Melanie Plageman @ 2025-11-20 15:27 UTC (permalink / raw)
To: Nazir Bilal Yavuz <[email protected]>; +Cc: Thomas Munro <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Wed, Nov 19, 2025 at 2:28 AM Nazir Bilal Yavuz <[email protected]> wrote:
>
> > To make sure 1) distance isn't reset to a resume_distance from
> > read_stream_begin_relation() and 2) unexpected buffers aren't returned
> > from the read stream, we could error out in read_stream_resume() if
> > pinned_buffers > 0. And in read_stream_reset(), we would save distance
> > in resume_distance before clearing distance. That would allow calling
> > read_stream_resume() either if you called read_stream_reset() or if
> > you exhausted the stream yourself. See rough attached patch for a
> > sketch of this.
>
> This looks correct to me. What do you think about using an assert
> instead of erroring out?
I'm not totally opposed to this. My rationale for making it an error
is that the developer could have test cases where all the buffers are
consumed but the code is written such that that won't always happen.
Then if a real production query doesn't consume all the buffers, it
could return wrong results (I think). That will mean the user can't
complete their query until the extension author releases a new version
of their code. But I'm not sure what the right answer is here.
- Melanie
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
@ 2025-12-09 03:47 ` Thomas Munro <[email protected]>
2025-12-09 21:42 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-12-09 22:38 ` Re: Trying out read streams in pgvector (an extension) Peter Geoghegan <[email protected]>
0 siblings, 2 replies; 18+ messages in thread
From: Thomas Munro @ 2025-12-09 03:47 UTC (permalink / raw)
To: Melanie Plageman <[email protected]>; Peter Geoghegan <[email protected]>; +Cc: Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Fri, Nov 21, 2025 at 4:28 AM Melanie Plageman
<[email protected]> wrote:
> I'm not totally opposed to this. My rationale for making it an error
> is that the developer could have test cases where all the buffers are
> consumed but the code is written such that that won't always happen.
> Then if a real production query doesn't consume all the buffers, it
> could return wrong results (I think). That will mean the user can't
> complete their query until the extension author releases a new version
> of their code. But I'm not sure what the right answer is here.
Focusing on making sure v19 has a good interface for this, and
abandoning thoughts of back-patching a bandaid, and the constraints
that leads to, for now...
I think it'd be better if that were the consumer's choice. I don't
want the consumer to be required to drain the stream before resuming,
as that'd be an unprincipled stall. For example, if new WAL arrives
over the network then I think it should be possible for recovery's
WAL-powered stream of heap pages to resume looking ahead even if
recovery hasn't drained the existing stream completely.
Peter G (CC'd) and I discussed some problems he had in the index
prefetching work, and I tried to extend this a bit to give the
semantics he wanted, in point 2 below. It's simple itself, but might
lead to some tricky questions higher up. Posted for experimentation.
It'll be interesting to see if this goes somewhere.
1. read_stream_resume() as before, but with a new explicit
read_stream_pause(): if a block number callback would like to report a
temporary lack of information, it should return
read_stream_pause(stream), not InvalidBlockNumber. Then after
read_stream_resume(stream) is called, the next
read_stream_next_buffer() enters the lookahead loop again. While
paused, if the consumer drains all the existing buffers in the stream
and then one more, it will receive InvalidBuffer, but if the _resume()
call is made sooner, the consumer won't ever know about the temporary
lack of buffers in the stream.
2. read_stream_yield(): while streaming heap pages that come from
TIDs on index pages, Peter didn't like that the executor lost control
of how much work was done by the lookahead loop underneath
read_stream_next_buffer(). The consumer might have a heap page with
some tuples that could be emitted right now, but the block number
callback might be evaluating arbitrarily expensive filter qual
expressions far ahead, and they might prefer to emit more tuples now
before doing an unbounded amount of work finding more. This interface
allows some limited coroutine-like multitasking, where the block
number callback can return read_stream_yield(stream) to return control
back to the consumer periodically if it knows the consumer could
already do something else. It works by pausing the stream and
resuming it in the next read_stream_next_buffer() call, but that's an
internal detail.
Some half-baked thoughts about the resulting flow control:
Yielding control periodically just when it happens to be possible
within the constraints of the volcano executor is an interesting thing
to think about. You can only yield if you already have a tuple to
emit. There is no saying when control will return to you, and the
node you yield to might immediately block on I/O and yet you could
have been doing useful CPU work. You probably need an event-driven
node-hopping executor to fix that in general, but on the flip side, I
can think of one bet that I'd take: if you already have a tuple to
emit AND if index scans themselves (not only referenced heap pages)
were also streamed AND if a hypothetical
read_stream_next_buffer_no_wait(btree_stream) said the next index page
you need is not ready yet, then you should yield. You're gambling
that other plan nodes will have better luck running without an I/O
stall, but you have ~0% chance.
Yielding just because you've scanned N index pages/tuples/whatever is
harder to think about. The stream shouldn't get far ahead unless it's
recently been useful for I/O concurrency (though optimal distance
heuristics are an open problem), but in this case a single invocation
of the block number callback can call ReadBuffer() an arbitrary number
of times, filtering out all the index tuples as it rampages through
the whole index IIUC. I see why you might want to yield periodically
if you can, but I also wonder how much that can really help if you
still have to pick up where you left off next time. I guess it
depends on the distribution of matches. It's also clear that any
cold-cache testing done with direct I/O enabled will stall abominably
as long as that level calls ReadBuffer(), possibly confusing matters.
Attachments:
[text/x-patch] 0001-Introduce-read_stream_-pause-resume-yield.patch (3.6K, 2-0001-Introduce-read_stream_-pause-resume-yield.patch)
download | inline diff:
From ff6da44df5418d73ad9ec1911c87b66897f3b086 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Sat, 15 Jun 2024 14:37:26 +1200
Subject: [PATCH 1/2] Introduce read_stream_{pause,resume,yield}().
---
src/backend/storage/aio/read_stream.c | 50 ++++++++++++++++++++++++++-
src/include/storage/read_stream.h | 3 ++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 031fde9f4cb..964e1aa281c 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -100,11 +100,13 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
int16 initialized_buffers;
+ int16 resume_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
bool advice_enabled;
bool temporary;
+ bool yielded;
/*
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -879,7 +881,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
/* End of stream reached? */
if (stream->distance == 0)
- return InvalidBuffer;
+ {
+ if (!stream->yielded)
+ return InvalidBuffer;
+
+ /* The callback yielded. Resume. */
+ stream->yielded = false;
+ read_stream_resume(stream);
+ Assert(stream->distance != 0);
+ }
/*
* The usual order of operations is that we look ahead at the bottom
@@ -1034,6 +1044,44 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
return read_stream_get_block(stream, NULL);
}
+/*
+ * Temporarily stop consuming block numbers from the block number callback. If
+ * called inside the block number callback, its return value should be
+ * returned by the callback.
+ */
+BlockNumber
+read_stream_pause(ReadStream *stream)
+{
+ stream->resume_distance = stream->distance;
+ stream->distance = 0;
+ return InvalidBlockNumber;
+}
+
+/*
+ * Resume looking ahead after the block number callback reported end-of-stream.
+ * This is useful for streams of self-referential blocks, after a buffer needed
+ * to be consumed and examined to find more block numbers.
+ */
+void
+read_stream_resume(ReadStream *stream)
+{
+ stream->distance = stream->resume_distance;
+}
+
+/*
+ * Called from inside a block number callback, to return control to the caller
+ * of read_stream_next_buffer() without looking further ahead. Its return
+ * value should be returned by the callback. This is equivalent to pausing and
+ * resuming automatically at the next call to read_stream_next_buffer().
+ */
+BlockNumber
+read_stream_yield(ReadStream *stream)
+{
+ read_stream_pause(stream);
+ stream->yielded = true;
+ return InvalidBlockNumber;
+}
+
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h
index 9b0d65161d0..8ac53d2902d 100644
--- a/src/include/storage/read_stream.h
+++ b/src/include/storage/read_stream.h
@@ -99,6 +99,9 @@ extern ReadStream *read_stream_begin_smgr_relation(int flags,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
+extern BlockNumber read_stream_pause(ReadStream *stream);
+extern void read_stream_resume(ReadStream *stream);
+extern BlockNumber read_stream_yield(ReadStream *stream);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
--
2.51.2
[text/x-patch] 0002-Add-tests-for-read_stream_-pause-resume-yield.patch (8.4K, 3-0002-Add-tests-for-read_stream_-pause-resume-yield.patch)
download | inline diff:
From de8a800b48829ef619200a71dba9028be3ea09e9 Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Wed, 12 Nov 2025 16:49:57 +1300
Subject: [PATCH 2/2] Add tests for read_stream_{pause,resume,yield}().
---
src/test/modules/test_aio/Makefile | 3 +-
src/test/modules/test_aio/meson.build | 1 +
src/test/modules/test_aio/t/001_aio.pl | 30 +++
src/test/modules/test_aio/test_aio--1.0.sql | 13 ++
src/test/modules/test_aio/test_read_stream.c | 181 +++++++++++++++++++
src/tools/pgindent/typedefs.list | 1 +
6 files changed, 228 insertions(+), 1 deletion(-)
create mode 100644 src/test/modules/test_aio/test_read_stream.c
diff --git a/src/test/modules/test_aio/Makefile b/src/test/modules/test_aio/Makefile
index f53cc64671a..465eb09ee4f 100644
--- a/src/test/modules/test_aio/Makefile
+++ b/src/test/modules/test_aio/Makefile
@@ -5,7 +5,8 @@ PGFILEDESC = "test_aio - test code for AIO"
MODULE_big = test_aio
OBJS = \
$(WIN32RES) \
- test_aio.o
+ test_aio.o \
+ test_read_stream.o
EXTENSION = test_aio
DATA = test_aio--1.0.sql
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 73d2fd68eaa..6e6fcbfdad9 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -2,6 +2,7 @@
test_aio_sources = files(
'test_aio.c',
+ 'test_read_stream.c',
)
if host_system == 'windows'
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 3f0453619e8..2a2c6523a6b 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1489,6 +1489,35 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
$psql->quit();
}
+# Read stream tests
+sub test_read_stream
+{
+ my $io_method = shift;
+ my $node = shift;
+ my ($ret, $output);
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ $psql->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+ $psql->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_yield('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+ $psql->quit();
+}
+
+
# Run all tests that are supported for all io_methods
sub test_generic
@@ -1525,6 +1554,7 @@ CHECKPOINT;
test_checksum($io_method, $node);
test_ignore_checksum($io_method, $node);
test_checksum_createdb($io_method, $node);
+ test_read_stream($io_method, $node);
SKIP:
{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..e37810b7273 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -106,3 +106,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_read_stream_yield(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_read_stream.c b/src/test/modules/test_aio/test_read_stream.c
new file mode 100644
index 00000000000..d1d436a90b7
--- /dev/null
+++ b/src/test/modules/test_aio/test_read_stream.c
@@ -0,0 +1,181 @@
+/*-------------------------------------------------------------------------
+ *
+ * test_read_stream.c
+ * Helpers to write tests for read_stream.c
+ *
+ * Copyright (c) 2020-2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/test/modules/test_aio/test_read_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "fmgr.h"
+#include "storage/bufmgr.h"
+#include "storage/read_stream.h"
+
+typedef struct
+{
+ BlockNumber blkno;
+ int count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ test_read_stream_resume_state *state = callback_private_data;
+
+ /* Periodic end-of-stream. */
+ if (++state->count % 3 == 0)
+ return read_stream_pause(stream);
+
+ return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+ Buffer buf;
+ ReadStream *stream;
+ test_read_stream_resume_state state = {.blkno = blkno};
+
+ rel = relation_open(relid, AccessShareLock);
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ test_read_stream_resume_cb,
+ &state,
+ 0);
+
+ for (int i = 0; i < 3; ++i)
+ {
+ /* Same block twice. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+
+ /* End-of-stream. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+
+ /* Resume. */
+ read_stream_resume(stream);
+ }
+
+ read_stream_end(stream);
+ relation_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
+
+typedef struct
+{
+ BlockNumber blkno;
+ int count;
+ int yields;
+ int blocks;
+} test_read_stream_yield_state;
+
+static BlockNumber
+test_read_stream_yield_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ test_read_stream_yield_state *state = callback_private_data;
+
+ /* Yield every third call. */
+ if (++state->count % 3 == 2)
+ {
+ state->yields++;
+ return read_stream_yield(stream);
+ }
+
+ state->blocks++;
+ return state->blkno;
+}
+
+/*
+ * Test read_stream_yield(), allowing control to be yielded temporarily from
+ * the lookahead loop and returned to the caller of read_stream_next_buffer().
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_yield);
+Datum
+test_read_stream_yield(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+ Buffer buf;
+ ReadStream *stream;
+ test_read_stream_yield_state state = {.blkno = blkno};
+
+ rel = relation_open(relid, AccessShareLock);
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ test_read_stream_yield_cb,
+ &state,
+ 0);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 1);
+ Assert(state.yields == 1);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 3);
+ Assert(state.yields == 1);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 3);
+ Assert(state.yields == 2);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 5);
+ Assert(state.yields == 2);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 5);
+ Assert(state.yields == 3);
+
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ Assert(state.blocks == 7);
+ Assert(state.yields == 3);
+
+ read_stream_end(stream);
+ relation_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cf3f6a7dafd..7396e9ce14b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4158,6 +4158,7 @@ td_entry
teSection
temp_tablespaces_extra
test_re_flags
+test_read_stream_resume_state
test_regex_ctx
test_shm_mq_header
test_spec
--
2.51.2
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-12-09 21:42 ` Melanie Plageman <[email protected]>
2026-04-08 18:06 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
1 sibling, 1 reply; 18+ messages in thread
From: Melanie Plageman @ 2025-12-09 21:42 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Peter Geoghegan <[email protected]>; Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Mon, Dec 8, 2025 at 10:47 PM Thomas Munro <[email protected]> wrote:
>
> I think it'd be better if that were the consumer's choice. I don't
> want the consumer to be required to drain the stream before resuming,
> as that'd be an unprincipled stall. For example, if new WAL arrives
> over the network then I think it should be possible for recovery's
> WAL-powered stream of heap pages to resume looking ahead even if
> recovery hasn't drained the existing stream completely.
>
> 1. read_stream_resume() as before, but with a new explicit
> read_stream_pause(): if a block number callback would like to report a
> temporary lack of information, it should return
> read_stream_pause(stream), not InvalidBlockNumber. Then after
> read_stream_resume(stream) is called, the next
> read_stream_next_buffer() enters the lookahead loop again. While
> paused, if the consumer drains all the existing buffers in the stream
> and then one more, it will receive InvalidBuffer, but if the _resume()
> call is made sooner, the consumer won't ever know about the temporary
> lack of buffers in the stream.
I like this new interface. If the user does want to exhaust the stream
(as was the case with earlier pgvector read stream user code), I
assume you would want to do:
read_stream_pause()
read_stream_reset()
read_stream_resume()
- Melanie
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-12-09 21:42 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
@ 2026-04-08 18:06 ` Melanie Plageman <[email protected]>
0 siblings, 0 replies; 18+ messages in thread
From: Melanie Plageman @ 2026-04-08 18:06 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Peter Geoghegan <[email protected]>; Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers; Andres Freund <[email protected]>
On Tue, Dec 9, 2025 at 4:42 PM Melanie Plageman
<[email protected]> wrote:
>
> > 1. read_stream_resume() as before, but with a new explicit
> > read_stream_pause(): if a block number callback would like to report a
> > temporary lack of information, it should return
> > read_stream_pause(stream), not InvalidBlockNumber. Then after
> > read_stream_resume(stream) is called, the next
> > read_stream_next_buffer() enters the lookahead loop again. While
> > paused, if the consumer drains all the existing buffers in the stream
> > and then one more, it will receive InvalidBuffer, but if the _resume()
> > call is made sooner, the consumer won't ever know about the temporary
> > lack of buffers in the stream.
I ended up committing read_stream_resume() in 38229cb905165fe but
without the tests because 1f6f200cab67e6, which added other read
stream tests, was imminent. I'd like to add the read_stream_resume()
test back now -- especially because we didn't end up adding another
user of read_stream_resume() in this release.
Attached 0001 is the test Thomas wrote ported over to be in the new
0004_read_stream.pl. It uses asserts instead of comparing output of
the SQL function to expected output, so I included a potential
alternative version of it in 0002 that uses that pattern. Note that
0002 is a diff from 0001, not an independent alternative patch. I
think the test needs more work either way, but I wanted to get the
ball rolling.
- Melanie
Attachments:
[text/x-patch] 0001-Add-test-for-read_stream_resume.patch (4.4K, 2-0001-Add-test-for-read_stream_resume.patch)
download | inline diff:
From 36668cc39dd8a6e249ece3ed53201cb3ea17073e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:40:16 -0400
Subject: [PATCH 1/2] Add test for read_stream_resume()
---
.../modules/test_aio/t/004_read_stream.pl | 22 ++++++
src/test/modules/test_aio/test_aio--1.0.sql | 9 +++
src/test/modules/test_aio/test_aio.c | 71 +++++++++++++++++++
src/tools/pgindent/typedefs.list | 1 +
4 files changed, 103 insertions(+)
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 32311c07ac0..28aedd7d163 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -115,6 +115,27 @@ sub test_repeated_blocks
}
+sub test_read_stream_resume
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ $psql->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
+INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
+SELECT test_read_stream_resume('tmp_read_stream', 0);
+DROP TABLE tmp_read_stream;
+));
+
+ ok(1, "$io_method: read_stream_resume");
+
+ $psql->quit();
+}
+
+
sub test_inject_foreign
{
my $io_method = shift;
@@ -268,6 +289,7 @@ sub test_io_method
$io_method, "$io_method: io_method set correctly");
test_repeated_blocks($io_method, $node);
+ test_read_stream_resume($io_method, $node);
SKIP:
{
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 762ac29512f..48caad25bda 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -128,3 +128,12 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_reopen_detach()
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
+
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 35efba1a5e3..9d2d32ad1a2 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -916,6 +916,77 @@ read_stream_for_blocks(PG_FUNCTION_ARGS)
}
+typedef struct
+{
+ BlockNumber blkno;
+ int count;
+} test_read_stream_resume_state;
+
+static BlockNumber
+test_read_stream_resume_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ test_read_stream_resume_state *state = callback_private_data;
+
+ /* Periodic end-of-stream. */
+ if (++state->count % 3 == 0)
+ return read_stream_pause(stream);
+
+ return state->blkno;
+}
+
+/*
+ * Test read_stream_resume(), allowing a stream to end temporarily and then
+ * continue where it left off.
+ */
+PG_FUNCTION_INFO_V1(test_read_stream_resume);
+Datum
+test_read_stream_resume(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+ Buffer buf;
+ ReadStream *stream;
+ test_read_stream_resume_state state = {.blkno = blkno};
+
+ rel = relation_open(relid, AccessShareLock);
+ stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ test_read_stream_resume_cb,
+ &state,
+ 0);
+
+ for (int i = 0; i < 3; ++i)
+ {
+ /* Same block twice. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferGetBlockNumber(buf) == blkno);
+ ReleaseBuffer(buf);
+
+ /* End-of-stream. */
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(buf == InvalidBuffer);
+
+ /* Resume. */
+ read_stream_resume(stream);
+ }
+
+ read_stream_end(stream);
+ relation_close(rel, NoLock);
+
+ PG_RETURN_VOID();
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ea95e7984bc..4236afefa2d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4329,6 +4329,7 @@ teSection
temp_tablespaces_extra
test128
test_re_flags
+test_read_stream_resume_state
test_regex_ctx
test_shm_mq_header
test_spec
--
2.43.0
[text/x-patch] 0002-alternative-approach-for-test-read_stream_resume.patch (5.0K, 3-0002-alternative-approach-for-test-read_stream_resume.patch)
download | inline diff:
From 8e8b0a3d967c1e9ae1be64257616d3a02f6f64b4 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 8 Apr 2026 13:55:05 -0400
Subject: [PATCH 2/2] alternative approach for test read_stream_resume
---
.../modules/test_aio/t/004_read_stream.pl | 21 +++++----
src/test/modules/test_aio/test_aio--1.0.sql | 4 +-
src/test/modules/test_aio/test_aio.c | 45 +++++++++++++++----
3 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
index 28aedd7d163..68d3be452e2 100644
--- a/src/test/modules/test_aio/t/004_read_stream.pl
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -122,15 +122,18 @@ sub test_read_stream_resume
my $psql = $node->background_psql('postgres', on_error_stop => 0);
- $psql->query_safe(
- qq(
-CREATE TEMPORARY TABLE tmp_read_stream(data int not null);
-INSERT INTO tmp_read_stream SELECT generate_series(1, 10000);
-SELECT test_read_stream_resume('tmp_read_stream', 0);
-DROP TABLE tmp_read_stream;
-));
-
- ok(1, "$io_method: read_stream_resume");
+ # The callback returns block 0 twice then pauses. We resume 3 times.
+ # -1 means read_stream_next_buffer() returned InvalidBuffer (paused).
+ my @one_cycle = (0, 0, -1);
+ my $expected = '{' . join(',', @one_cycle, @one_cycle, @one_cycle) . '}';
+
+ my $result = $psql->query_safe(
+ qq(SELECT array_agg(blocknum ORDER BY call_index)
+ FROM test_read_stream_resume('largeish', 0);));
+ chomp($result);
+
+ is($result, $expected,
+ "$io_method: read_stream_resume");
$psql->quit();
}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 48caad25bda..d3c941d5221 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -134,6 +134,6 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Read stream related functions
*/
-CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION test_read_stream_resume(rel regclass, blockno int4, OUT call_index int4, OUT blocknum int4)
+RETURNS SETOF record STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 9d2d32ad1a2..abdfa1cb448 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -939,6 +939,11 @@ test_read_stream_resume_cb(ReadStream *stream,
/*
* Test read_stream_resume(), allowing a stream to end temporarily and then
* continue where it left off.
+ *
+ * Returns a result set of (call_index int4, blocknum int4) rows so that the
+ * caller can validate the exact sequence. A blocknum of -1 indicates that
+ * read_stream_next_buffer() returned InvalidBuffer (i.e. the stream was
+ * paused).
*/
PG_FUNCTION_INFO_V1(test_read_stream_resume);
Datum
@@ -946,10 +951,14 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
BlockNumber blkno = PG_GETARG_UINT32(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
Relation rel;
Buffer buf;
ReadStream *stream;
test_read_stream_resume_state state = {.blkno = blkno};
+ int call_index = 0;
+
+ InitMaterializedSRF(fcinfo, 0);
rel = relation_open(relid, AccessShareLock);
stream = read_stream_begin_relation(READ_STREAM_DEFAULT,
@@ -962,19 +971,37 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
for (int i = 0; i < 3; ++i)
{
+ Datum values[2] = {0};
+ bool nulls[2] = {0};
+
/* Same block twice. */
buf = read_stream_next_buffer(stream, NULL);
- Assert(BufferGetBlockNumber(buf) == blkno);
- ReleaseBuffer(buf);
- buf = read_stream_next_buffer(stream, NULL);
- Assert(BufferGetBlockNumber(buf) == blkno);
- ReleaseBuffer(buf);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
- /* End-of-stream. */
buf = read_stream_next_buffer(stream, NULL);
- Assert(buf == InvalidBuffer);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
+
+ /* End-of-stream (paused). */
buf = read_stream_next_buffer(stream, NULL);
- Assert(buf == InvalidBuffer);
+ values[0] = Int32GetDatum(call_index++);
+ values[1] = BufferIsValid(buf) ?
+ Int32GetDatum((int32) BufferGetBlockNumber(buf)) :
+ Int32GetDatum(-1);
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ if (BufferIsValid(buf))
+ ReleaseBuffer(buf);
/* Resume. */
read_stream_resume(stream);
@@ -983,7 +1010,7 @@ test_read_stream_resume(PG_FUNCTION_ARGS)
read_stream_end(stream);
relation_close(rel, NoLock);
- PG_RETURN_VOID();
+ return (Datum) 0;
}
--
2.43.0
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-12-09 22:38 ` Peter Geoghegan <[email protected]>
2025-12-14 23:02 ` Re: Trying out read streams in pgvector (an extension) Tomas Vondra <[email protected]>
1 sibling, 1 reply; 18+ messages in thread
From: Peter Geoghegan @ 2025-12-09 22:38 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Melanie Plageman <[email protected]>; Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On Mon, Dec 8, 2025 at 10:47 PM Thomas Munro <[email protected]> wrote:
> Yielding just because you've scanned N index pages/tuples/whatever is
> harder to think about. The stream shouldn't get far ahead unless it's
> recently been useful for I/O concurrency (though optimal distance
> heuristics are an open problem), but in this case a single invocation
> of the block number callback can call ReadBuffer() an arbitrary number
> of times, filtering out all the index tuples as it rampages through
> the whole index IIUC. I see why you might want to yield periodically
> if you can, but I also wonder how much that can really help if you
> still have to pick up where you left off next time.
I think of it as a necessary precaution against pathological behavior
where the amount of memory used to cache matching tuples/TIDs gets out
of hand. There's no specific reason to expect that to happen (or no
good reason). But I'm pretty sure that it'll prove necessary to pay
non-zero attention to how much work has been done since the last time
we returned a tuple (when there's a tuple available to return).
> I guess it
> depends on the distribution of matches.
To be clear, I haven't done any kind of modelling of the problems in
this area. Once I do that (in 2026), I'll be able to say more about
the requirements. Maybe Tomas could take a look sooner?
Right now my focus is on getting the basic interfaces/API revisions in
better shape. And avoiding regressions while doing so.
--
Peter Geoghegan
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 08:04 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-18 21:17 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Re: Trying out read streams in pgvector (an extension) Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-12-09 22:38 ` Re: Trying out read streams in pgvector (an extension) Peter Geoghegan <[email protected]>
@ 2025-12-14 23:02 ` Tomas Vondra <[email protected]>
0 siblings, 0 replies; 18+ messages in thread
From: Tomas Vondra @ 2025-12-14 23:02 UTC (permalink / raw)
To: Peter Geoghegan <[email protected]>; Thomas Munro <[email protected]>; +Cc: Melanie Plageman <[email protected]>; Nazir Bilal Yavuz <[email protected]>; Jonathan S. Katz <[email protected]>; pgsql-hackers
On 12/9/25 23:38, Peter Geoghegan wrote:
> On Mon, Dec 8, 2025 at 10:47 PM Thomas Munro <[email protected]> wrote:
>> Yielding just because you've scanned N index pages/tuples/whatever is
>> harder to think about. The stream shouldn't get far ahead unless it's
>> recently been useful for I/O concurrency (though optimal distance
>> heuristics are an open problem), but in this case a single invocation
>> of the block number callback can call ReadBuffer() an arbitrary number
>> of times, filtering out all the index tuples as it rampages through
>> the whole index IIUC. I see why you might want to yield periodically
>> if you can, but I also wonder how much that can really help if you
>> still have to pick up where you left off next time.
>
> I think of it as a necessary precaution against pathological behavior
> where the amount of memory used to cache matching tuples/TIDs gets out
> of hand. There's no specific reason to expect that to happen (or no
> good reason). But I'm pretty sure that it'll prove necessary to pay
> non-zero attention to how much work has been done since the last time
> we returned a tuple (when there's a tuple available to return).
>
I think it's not all that difficult to hit such runaway cases, loading
arbitrary number of batches ahead. That's exactly why I had to come up
with the read_stream_reset approach in the first place, actually.
The simplest example is an index scan on a correlated index. We skip
duplicate blocks, so to find the next block to pass to the stream, we
may have to load multiple leaf pages. And we may need multiple such
blocks, to satisfy the prefetch distance (= number of IOs).
An even more extreme example is IOS, with just a couple not-allvisible
pages (that need prefetching). You hit the first one, then the distance
ramps up, and at some point there are no more not-allvisible pages. But
we try to maintain the distance, and we end up reading all remaining
leaf pages.
I'm sure we need to protect against these issues, which is why we have
INDEX_SCAN_MAX_BATCHES.
IIRC you also suggested we will need some internal limit to keep the
number of buffer pins under control, right?
I agree there may be other important reasons to "pause", e.g. based on
how much work was done since the last time the index scan returned a
tuple. But I'm not sure what exactly to look at, because most of the
"work" is happening outside the index scan, no?
>> I guess it
>> depends on the distribution of matches.
>
> To be clear, I haven't done any kind of modelling of the problems in
> this area. Once I do that (in 2026), I'll be able to say more about
> the requirements. Maybe Tomas could take a look sooner?
>
I don't have any explicit formal model of the problems, but it should
not be difficult to come up with "adversary cases" for the prefetching
patches, for example. That is, ways to construct datasets / indexes that
end up looking very far (many leafs) ahead.
Is that what you meant by modelling, or did you mean some sort of a
formal model of how far to prefetch for a given dataset?
AFAICS the "ideal" prefetch distance would be such that a page gets
loaded into shared buffers right before the scan actually needs it
(after processing through the earlier index entries).
Let's say we know
* T1 - cost of "processing" one index tuple (average time between calls
to table_index_fetch_tuple?)
* T2 - cost of I/O, i.e. how long does it take to read a block
We want keep the look-ahead distance at least K, such that
K * T1 > T2
but not much more than that. But I'm not sure where to get T1 and T2, as
T1 depends on what happens outside the index scan, and T2 is hardware
dependent (and not actually linear / additive).
Or am I confused and you asked about something else?
regards
--
Tomas Vondra
^ permalink raw reply [nested|flat] 18+ messages in thread
* Re: Trying out read streams in pgvector (an extension)
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 21:21 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-11 22:52 ` Re: Trying out read streams in pgvector (an extension) Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
2025-11-12 04:11 ` Re: Trying out read streams in pgvector (an extension) Thomas Munro <[email protected]>
@ 2025-11-12 20:44 ` Melanie Plageman <[email protected]>
1 sibling, 0 replies; 18+ messages in thread
From: Melanie Plageman @ 2025-11-12 20:44 UTC (permalink / raw)
To: Thomas Munro <[email protected]>; +Cc: Jonathan S. Katz <[email protected]>; pgsql-hackers
On Tue, Nov 11, 2025 at 11:12 PM Thomas Munro <[email protected]> wrote:
>
> Here is a draft patch like that, that tries to be as small as
> possible. Trying out the name read_stream_resume().
I like read_stream_resume(). Tested out 0001 with pgvector and can
confirm it works.
In the test, I would initialize test_read_stream_resume_state.count to 0
+ test_read_stream_resume_state state = {.blkno = blkno};
- Melanie
^ permalink raw reply [nested|flat] 18+ messages in thread
end of thread, other threads:[~2026-04-08 18:06 UTC | newest]
Thread overview: 18+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2024-06-11 15:37 Re: Trying out read streams in pgvector (an extension) Jonathan S. Katz <[email protected]>
2024-09-06 04:28 ` Thomas Munro <[email protected]>
2024-09-06 22:27 ` Thomas Munro <[email protected]>
2025-11-11 21:21 ` Thomas Munro <[email protected]>
2025-11-11 22:52 ` Melanie Plageman <[email protected]>
2025-11-11 23:19 ` Thomas Munro <[email protected]>
2025-11-12 04:11 ` Thomas Munro <[email protected]>
2025-11-12 08:04 ` Nazir Bilal Yavuz <[email protected]>
2025-11-12 10:47 ` Thomas Munro <[email protected]>
2025-11-18 21:17 ` Melanie Plageman <[email protected]>
2025-11-19 07:27 ` Nazir Bilal Yavuz <[email protected]>
2025-11-20 15:27 ` Melanie Plageman <[email protected]>
2025-12-09 03:47 ` Thomas Munro <[email protected]>
2025-12-09 21:42 ` Melanie Plageman <[email protected]>
2026-04-08 18:06 ` Melanie Plageman <[email protected]>
2025-12-09 22:38 ` Peter Geoghegan <[email protected]>
2025-12-14 23:02 ` Tomas Vondra <[email protected]>
2025-11-12 20:44 ` Melanie Plageman <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox