public inbox for [email protected]  
help / color / mirror / Atom feed
From: Thomas Munro <[email protected]>
To: pgsql-hackers <[email protected]>
Subject: Trying out read streams in pgvector (an extension)
Date: Tue, 11 Jun 2024 16:53:41 +1200
Message-ID: <CA+hUKGJ_7NKd46nx1wbyXWriuZSNzsTfm+rhEuvU6nxZi3-KVw@mail.gmail.com> (raw)

Hi,

I was looking around for an exotic index type to try the experience of
streamifying an extension, ie out-of-core code.  I am totally new to
pgvector, but since everyone keeps talking about it, I could not avoid
picking up some basic facts in the pgconf.dev hallway track, and
understood that its scans have some degree of known-order access
predictability, and then also some degree of fuzzy-predictable
order-not-yet-determined access too.  It's also quite random in the
I/O sense.

Here's a toy to streamify the known-order part.  I think for the fuzzy
part that links those parts together, maybe there is some way to guess
when it's a reasonable time to speculatively prefetch the lowest order
stuff in the pairing heap, and then deal with it if you're wrong, but
I didn't try that...

Someone involved in that project mentioned that it's probably not a
great topic to research in practice, because real world users of HNSW
use fully cached ie prewarmed indexes, because the performance is so
bad otherwise.  (Though maybe that argument is a little circular...).
So although this patch clearly speeds up cold HSNW searches to a
degree controlled by effective_io_concurrency, I'll probably look for
something else.  Suggestions for interesting index types to look at
streamifying are very welcome!

Hmm.  If that's really true about HNSW though, then there may still be
an opportunity to do automatic memory prefetching[1].  But then in the
case of index building, "stream" is NULL in this patch anyway.  It
surely must also be possible to find some good places to put
profitable explicit pg_mem_prefetch() calls given the predictability
and the need to get only ~60ns ahead for that usage.  I didn't look
into that because I was trying to prove things about read_stream.c,
not get involved in another project :-D

Here ends my science experiment report, which I'm dropping here just
in case others see useful ideas here.  The main thing I learned about
the read stream API is that it'd be nice to be able to reset the
stream but preserve the distance (something that came up on the
streaming sequential scan thread for a different reason), to deal with
cases where look-ahead opportunities come in bursts but you want a
longer lived stream than I used here.  That is the reason the patch
creates and destroys temporary streams in a loop; doh.  It also
provides an interesting case study for what speculative random
look-ahead support might need to look like.

[1] https://www.postgresql.org/message-id/flat/CA%2BhUKGKNUMnqubrrz8pRBdEM8vHeSCZcNq7iqERmkt6zPtpA3g%40m...

=== setup ====

create extension vector;

create or replace function random_vector(dimensions int)
returns vector language sql
begin atomic;
  select array_agg(random())::vector
    from generate_series(1, dimensions);
end;

create table t (id serial, embedding vector(6));

insert into t (embedding)
select random_vector(6)
  from generate_series(1, 1000000);

set maintenance_work_mem = '2GB';

create index on t using hnsw(embedding vector_l2_ops);

=== test of a hot search, assuming repeated ===

select embedding <-> '[0.5,0.5,0.5,0.5,0.5,0.5]'::vector
  from t
 where embedding <-> '[0.5,0.5,0.5,0.5,0.5,0.5]'::vector < 0.2
 order by 1 limit 20;

=== test of a cold search, assuming empty caches ===

create or replace function test()
returns void
language plpgsql as
$$
declare
  my_vec vector(6) := random_vector(6);
begin
  perform embedding <-> my_vec
     from t
    where embedding <-> my_vec < 0.2
    order by 1 limit 20;
end;
$$;

select test();

(Make sure you remember to set effective_io_concurrency to an
interesting number if you want to generate a lot of overlapping
fadvise calls.)


Attachments:

  [application/octet-stream] 0001-XXX-toy-use-of-read-stream-API.patch (5.4K, 2-0001-XXX-toy-use-of-read-stream-API.patch)
  download | inline diff:
From 344cb7758d52594392e955efeb33c32cedfcb47a Mon Sep 17 00:00:00 2001
From: Thomas Munro <[email protected]>
Date: Tue, 11 Jun 2024 14:32:47 +1200
Subject: [PATCH] XXX toy use of read stream API

---
 src/hnsw.h      |  1 +
 src/hnswutils.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+)

diff --git a/src/hnsw.h b/src/hnsw.h
index 480ad9f..e738241 100644
--- a/src/hnsw.h
+++ b/src/hnsw.h
@@ -136,6 +136,7 @@ struct HnswElementData
 	uint8		deleted;
 	uint32		hash;
 	HnswNeighborsPtr neighbors;
+	Buffer		buffer;
 	BlockNumber blkno;
 	OffsetNumber offno;
 	OffsetNumber neighborOffno;
diff --git a/src/hnswutils.c b/src/hnswutils.c
index d3ba911..b9addff 100644
--- a/src/hnswutils.c
+++ b/src/hnswutils.c
@@ -14,6 +14,10 @@
 #include "utils/memdebug.h"
 #include "utils/rel.h"
 
+#if PG_VERSION_NUM >= 170000
+#include "storage/read_stream.h"
+#endif
+
 #if PG_VERSION_NUM >= 130000
 #include "common/hashfn.h"
 #else
@@ -278,6 +282,9 @@ HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno)
 	HnswElement element = palloc(sizeof(HnswElementData));
 	char	   *base = NULL;
 
+#if PG_VERSION_NUM >= 170000
+	element->buffer = InvalidBuffer;
+#endif
 	element->blkno = blkno;
 	element->offno = offno;
 	HnswPtrStore(base, element->neighbors, (HnswNeighborArrayPtr *) NULL);
@@ -552,7 +559,20 @@ HnswLoadElement(HnswElement element, float *distance, Datum *q, Relation index,
 	HnswElementTuple etup;
 
 	/* Read vector */
+#if PG_VERSION_NUM >= 170000
+	if (element->buffer != InvalidBuffer)
+	{
+		/* Buffer pinned already. */
+		buf = element->buffer;
+		Assert(BufferGetBlockNumber(buf) == element->blkno);
+	}
+	else
+	{
+		buf = ReadBuffer(index, element->blkno);
+	}
+#else
 	buf = ReadBuffer(index, element->blkno);
+#endif
 	LockBuffer(buf, BUFFER_LOCK_SHARE);
 	page = BufferGetPage(buf);
 
@@ -714,6 +734,28 @@ CountElement(char *base, HnswElement skipElement, HnswCandidate * hc)
 	return e->heaptidsLength != 0;
 }
 
+#if PG_VERSION_NUM >= 170000
+typedef struct HnswSearchLayerNextBlockData {
+	char	   *base;
+	HnswCandidate **items;
+	int			nitems;
+	int			i;
+} HnswSearchLayerNextBlockData;
+
+static BlockNumber
+HnswSearchLayerNextBlock(ReadStream *stream, void *callback_data, void *per_buffer_data)
+{
+	HnswSearchLayerNextBlockData *data = callback_data;
+	HnswElement hce;
+
+	if (data->i == data->nitems)
+		return InvalidBlockNumber;
+
+	hce = HnswPtrAccess(data->base, data->items[data->i++]->element);
+	return hce->blkno;
+}
+#endif
+
 /*
  * Algorithm 2 from paper
  */
@@ -729,6 +771,11 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 	HnswNeighborArray *neighborhoodData = NULL;
 	Size		neighborhoodSize;
 
+#if PG_VERSION_NUM >= 170000
+	ReadStream *stream;
+	HnswSearchLayerNextBlockData stream_callback_data;
+#endif
+
 	InitVisited(base, &v, index, ef, m);
 
 	/* Create local memory for neighborhood if needed */
@@ -764,6 +811,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 		HnswCandidate *c = ((HnswPairingHeapNode *) pairingheap_remove_first(C))->inner;
 		HnswCandidate *f = ((HnswPairingHeapNode *) pairingheap_first(W))->inner;
 		HnswElement cElement;
+		HnswCandidate *items[HNSW_MAX_SIZE];
+		int nitems;
 
 		if (c->distance > f->distance)
 			break;
@@ -785,6 +834,8 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			neighborhood = neighborhoodData;
 		}
 
+		/* Build a list of indexes of neighbors to visit. */
+		nitems = 0;
 		for (int i = 0; i < neighborhood->length; i++)
 		{
 			HnswCandidate *e = &neighborhood->items[i];
@@ -793,6 +844,38 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 			AddToVisited(base, &v, e, index, &visited);
 
 			if (!visited)
+				items[nitems++] = e;
+		}
+
+#if PG_VERSION_NUM >= 170000
+		stream_callback_data.base = base;
+		stream_callback_data.items = items;
+		stream_callback_data.nitems = nitems;
+		stream_callback_data.i = 0;
+
+		/*
+		 * XXX For this quick-and-dirty hack, we'll use a temporary stream for
+		 * each set of neighbors we visit...  Should really re-use a stream,
+		 * and reset it after we hit stall points that need more data to look
+		 * further ahead.
+		 */
+		if (index)
+			stream = read_stream_begin_relation(READ_STREAM_FULL,
+												NULL,
+												index,
+												MAIN_FORKNUM,
+												HnswSearchLayerNextBlock,
+												&stream_callback_data,
+												0);
+		else
+			stream = NULL;
+#endif
+
+		/* Visit them. */
+		for (int i = 0; i < nitems; i++)
+		{
+			HnswCandidate *e = items[i];
+
 			{
 				float		eDistance;
 				HnswElement eElement = HnswPtrAccess(base, e->element);
@@ -802,7 +885,13 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 				if (index == NULL)
 					eDistance = GetCandidateDistance(base, e, q, procinfo, collation);
 				else
+				{
+#if PG_VERSION_NUM >= 170000
+					if (stream)
+						eElement->buffer = read_stream_next_buffer(stream, NULL);
+#endif
 					HnswLoadElement(eElement, &eDistance, &q, index, procinfo, collation, inserting, wlen >= ef ? &f->distance : NULL);
+				}
 
 				if (eDistance < f->distance || wlen < ef)
 				{
@@ -838,6 +927,10 @@ HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, F
 				}
 			}
 		}
+#if PG_VERSION_NUM >= 170000
+		if (stream)
+			read_stream_end(stream);
+#endif
 	}
 
 	/* Add each element of W to w */
-- 
2.45.1



view thread (2+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected]
  Subject: Re: Trying out read streams in pgvector (an extension)
  In-Reply-To: <CA+hUKGJ_7NKd46nx1wbyXWriuZSNzsTfm+rhEuvU6nxZi3-KVw@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox