public inbox for [email protected]  
help / color / mirror / Atom feed
From: Xuneng Zhou <[email protected]>
To: Nazir Bilal Yavuz <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Streamify more code paths
Date: Tue, 30 Dec 2025 10:43:17 +0800
Message-ID: <CABPTF7W-f_zPN442FCp4Xaopi721oDmGYimq=VhAk=F7jwYZDQ@mail.gmail.com> (raw)
In-Reply-To: <CABPTF7Vd4JWSHi9N7pGTzn6xmOdtAToCe1NGbZAH8U9_mXOqpw@mail.gmail.com>
References: <CABPTF7VrqfbcDXqGrdLQ2xaQ=K0RzExNuw6U_GGqzSJu32wfdQ@mail.gmail.com>
	<CABPTF7X6qHqd3820KVBZ+n5eoaXZ6a99RB89bZfe33d+K+Vybw@mail.gmail.com>
	<CAN55FZ02eR083kPV_8_boWEJphXZW=-hRxJKp7nwR-WomyKb6g@mail.gmail.com>
	<CABPTF7VSa5L=k6ONVUZHfRrO2Y2_iYz6npWj0Na69RoCvSevpQ@mail.gmail.com>
	<CABPTF7V3+QGC+0W9ERCcAY14jq_w_XvmwrRs9vXbi_oqv4FnTQ@mail.gmail.com>
	<CABPTF7VyePb8O-WDgs2hCCXYhZzGzdjg0N3NkxojZ=ke4SB3pA@mail.gmail.com>
	<CAN55FZ39HSsXKTSi66ASq+i4Ed5FuGXD11hmJ+8c0F0O0+ozew@mail.gmail.com>
	<CABPTF7Vd4JWSHi9N7pGTzn6xmOdtAToCe1NGbZAH8U9_mXOqpw@mail.gmail.com>

Hi,

On Tue, Dec 30, 2025 at 9:51 AM Xuneng Zhou <[email protected]> wrote:
>
> Hi,
>
> Thanks for looking into this.
>
> On Mon, Dec 29, 2025 at 6:58 PM Nazir Bilal Yavuz <[email protected]> wrote:
> >
> > Hi,
> >
> > On Sun, 28 Dec 2025 at 14:46, Xuneng Zhou <[email protected]> wrote:
> > >
> > > Hi,
> > > >
> > > > Two more to go:
> > > > patch 5: Streamify log_newpage_range() WAL logging path
> > > > patch 6: Streamify hash index VACUUM primary bucket page reads
> > > >
> > > > Benchmarks will be conducted soon.
> > > >
> > >
> > > v6 in the last message has a problem and has not been updated. Attach
> > > the right one again. Sorry for the noise.
> >
> > 0003 and 0006:
> >
> > You need to add 'StatApproxReadStreamPrivate' and
> > 'HashBulkDeleteStreamPrivate' to the typedefs.list.
>
> Done.
>
> > 0005:
> >
> > @@ -1321,8 +1341,10 @@ log_newpage_range(Relation rel, ForkNumber forknum,
> >          nbufs = 0;
> >          while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
> >          {
> > -            Buffer        buf = ReadBufferExtended(rel, forknum, blkno,
> > -                                                 RBM_NORMAL, NULL);
> > +            Buffer        buf = read_stream_next_buffer(stream, NULL);
> > +
> > +            if (!BufferIsValid(buf))
> > +                break;
> >
> > We are loosening a check here, there should not be a invalid buffer in
> > the stream until the endblk. I think you can remove this
> > BufferIsValid() check, then we can learn if something goes wrong.
>
> My concern before for not adding assert at the end of streaming is the
> potential early break in here:
>
> /* Nothing more to do if all remaining blocks were empty. */
> if (nbufs == 0)
>     break;
>
> After looking more closely, it turns out to be a misunderstanding of the logic.
>
> > 0006:
> >
> > You can use read_stream_reset() instead of read_stream_end(), then you
> > can use the same stream with different variables, I believe this is
> > the preferred way.
> >
> > Rest LGTM!
> >
>
> Yeah, reset seems a more proper way here.
>

Run pgindent using the updated typedefs.list.

-- 
Best,
Xuneng


Attachments:

  [application/octet-stream] v4-0001-Switch-Bloom-scan-paths-to-streaming-read.-n-nRep.patch (2.4K, 2-v4-0001-Switch-Bloom-scan-paths-to-streaming-read.-n-nRep.patch)
  download | inline diff:
From 314f9cdf6d8a62cc8523b377b8cf19df646b9913 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:28:10 +0800
Subject: [PATCH v4 1/6] Switch Bloom scan paths to streaming read.\n\nReplace
 per-page ReadBuffer loops in blgetbitmap() with read_stream_begin_relation()
 and sequential buffer iteration, reducing buffer churn and improving scan
 efficiency on large Bloom indexes.

---
 contrib/bloom/blscan.c | 29 +++++++++++++++++++++++++----
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git a/contrib/bloom/blscan.c b/contrib/bloom/blscan.c
index 0d71edbe91c..b1fdabaab74 100644
--- a/contrib/bloom/blscan.c
+++ b/contrib/bloom/blscan.c
@@ -17,6 +17,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
+#include "storage/read_stream.h"
 
 /*
  * Begin scan of bloom index.
@@ -75,11 +76,13 @@ int64
 blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 {
 	int64		ntids = 0;
-	BlockNumber blkno = BLOOM_HEAD_BLKNO,
+	BlockNumber blkno,
 				npages;
 	int			i;
 	BufferAccessStrategy bas;
 	BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (so->sign == NULL)
 	{
@@ -119,14 +122,29 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	if (scan->instrument)
 		scan->instrument->nsearches++;
 
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										bas,
+										scan->indexRelation,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
 		Page		page;
 
-		buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM,
-									blkno, RBM_NORMAL, bas);
-
+		buffer = read_stream_next_buffer(stream, NULL);
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -162,6 +180,9 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 		UnlockReleaseBuffer(buffer);
 		CHECK_FOR_INTERRUPTS();
 	}
+
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
 	FreeAccessStrategy(bas);
 
 	return ntids;
-- 
2.51.0



  [application/octet-stream] v4-0005-Streamify-log_newpage_range-WAL-logging-path.patch (2.3K, 3-v4-0005-Streamify-log_newpage_range-WAL-logging-path.patch)
  download | inline diff:
From 5d286e1a8ecce90921e44a655751f8cc5875458a Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sun, 28 Dec 2025 18:29:07 +0800
Subject: [PATCH v4 5/6] Streamify log_newpage_range() WAL logging path

Refactor log_newpage_range() to use the Read Stream. This allows
prefetching of upcoming relation blocks during bulk WAL logging
perations, overlapping I/O with CPU-intensive XLogInsert and
WAL-writing work.
---
 src/backend/access/transam/xloginsert.c | 26 +++++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index a56d5a55282..2075aea7037 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -39,6 +39,7 @@
 #include "replication/origin.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "storage/read_stream.h"
 #include "utils/memutils.h"
 #include "utils/pgstat_internal.h"
 
@@ -1295,6 +1296,8 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 {
 	int			flags;
 	BlockNumber blkno;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	flags = REGBUF_FORCE_IMAGE;
 	if (page_std)
@@ -1307,6 +1310,23 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 	 */
 	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
 
+	/* Set up a streaming read for the range of blocks */
+	p.current_blocknum = startblk;
+	p.last_exclusive = endblk;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_USE_BATCHING,
+										NULL,
+										rel,
+										forknum,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	blkno = startblk;
 	while (blkno < endblk)
 	{
@@ -1321,8 +1341,7 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 		nbufs = 0;
 		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
 		{
-			Buffer		buf = ReadBufferExtended(rel, forknum, blkno,
-												 RBM_NORMAL, NULL);
+			Buffer		buf = read_stream_next_buffer(stream, NULL);
 
 			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 
@@ -1361,6 +1380,9 @@ log_newpage_range(Relation rel, ForkNumber forknum,
 		}
 		END_CRIT_SECTION();
 	}
+
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
 }
 
 /*
-- 
2.51.0



  [application/octet-stream] v4-0004-Replace-synchronous-ReadBufferExtended-loop-with-.patch (2.5K, 4-v4-0004-Replace-synchronous-ReadBufferExtended-loop-with-.patch)
  download | inline diff:
From f39c8ffa3ba727f0c1656c7106130981bcfe59d0 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:29:09 +0800
Subject: [PATCH v4 4/6] Replace synchronous ReadBufferExtended() loop with the
 streaming read in ginvacuumcleanup() to improve I/O efficiency during GIN
 index vacuum cleanup operations

---
 src/backend/access/gin/ginvacuum.c | 28 ++++++++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c
index d7baf7c847c..58e05c71256 100644
--- a/src/backend/access/gin/ginvacuum.c
+++ b/src/backend/access/gin/ginvacuum.c
@@ -22,6 +22,7 @@
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "storage/read_stream.h"
 #include "utils/memutils.h"
 
 struct GinVacuumState
@@ -693,6 +694,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	BlockNumber totFreePages;
 	GinState	ginstate;
 	GinStatsData idxStat;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	/*
 	 * In an autovacuum analyze, we want to clean up pending insertions.
@@ -743,6 +746,24 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 	totFreePages = 0;
 
+	/* Scan all blocks starting from the root using streaming reads */
+	p.current_blocknum = GIN_ROOT_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = GIN_ROOT_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
@@ -750,8 +771,8 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
+
 		LockBuffer(buffer, GIN_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -776,6 +797,9 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/* Update the metapage with accurate page and entry counts */
 	idxStat.nTotalPages = npages;
 	ginUpdateStats(info->index, &idxStat, false);
-- 
2.51.0



  [application/octet-stream] v4-0002-Streamify-Bloom-VACUUM-paths.-n-nUse-streaming-re.patch (4.2K, 5-v4-0002-Streamify-Bloom-VACUUM-paths.-n-nUse-streaming-re.patch)
  download | inline diff:
From aafdfa0851ab61777e8877cfb50e3b34bf54b9b1 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:28:49 +0800
Subject: [PATCH v4 2/6] Streamify Bloom VACUUM paths.\n\nUse streaming reads
 in blbulkdelete() and blvacuumcleanup() to iterate index pages without
 repeated ReadBuffer calls, improving VACUUM performance and reducing buffer
 manager overhead during maintenance operations.

---
 contrib/bloom/blvacuum.c | 55 +++++++++++++++++++++++++++++++++++++---
 1 file changed, 51 insertions(+), 4 deletions(-)

diff --git a/contrib/bloom/blvacuum.c b/contrib/bloom/blvacuum.c
index e68a9008f56..7452302f022 100644
--- a/contrib/bloom/blvacuum.c
+++ b/contrib/bloom/blvacuum.c
@@ -17,6 +17,7 @@
 #include "commands/vacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
+#include "storage/read_stream.h"
 
 
 /*
@@ -40,6 +41,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Page		page;
 	BloomMetaPageData *metaData;
 	GenericXLogState *gxlogState;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (stats == NULL)
 		stats = palloc0_object(IndexBulkDeleteResult);
@@ -51,6 +54,25 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	 * they can't contain tuples to delete.
 	 */
 	npages = RelationGetNumberOfBlocks(index);
+
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		BloomTuple *itup,
@@ -59,8 +81,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
 
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		gxlogState = GenericXLogStart(index);
@@ -133,6 +154,9 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/*
 	 * Update the metapage's notFullPage list with whatever we found.  Our
 	 * info could already be out of date at this point, but blinsert() will
@@ -166,6 +190,8 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	Relation	index = info->index;
 	BlockNumber npages,
 				blkno;
+	BlockRangeReadStreamPrivate p;
+	ReadStream *stream;
 
 	if (info->analyze_only)
 		return stats;
@@ -181,6 +207,25 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 	stats->num_pages = npages;
 	stats->pages_free = 0;
 	stats->num_index_tuples = 0;
+
+	/* Scan all blocks except the metapage using streaming reads */
+	p.current_blocknum = BLOOM_HEAD_BLKNO;
+	p.last_exclusive = npages;
+
+	/*
+	 * It is safe to use batchmode as block_range_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_FULL |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										index,
+										MAIN_FORKNUM,
+										block_range_read_stream_cb,
+										&p,
+										0);
+
 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
 	{
 		Buffer		buffer;
@@ -188,8 +233,7 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 
 		vacuum_delay_point(false);
 
-		buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
-									RBM_NORMAL, info->strategy);
+		buffer = read_stream_next_buffer(stream, NULL);
 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
 		page = BufferGetPage(buffer);
 
@@ -206,6 +250,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
 		UnlockReleaseBuffer(buffer);
 	}
 
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	IndexFreeSpaceMapVacuum(info->index);
 
 	return stats;
-- 
2.51.0



  [application/octet-stream] v4-0006-Streamify-hash-index-VACUUM-primary-bucket-page-r.patch (5.5K, 6-v4-0006-Streamify-hash-index-VACUUM-primary-bucket-page-r.patch)
  download | inline diff:
From 586b8007cff40671f7039d67b77f8e0154e8e782 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sun, 28 Dec 2025 18:29:28 +0800
Subject: [PATCH v4 6/6] Streamify hash index VACUUM primary bucket page reads

Refactor hashbulkdelete() to use the Read Stream  for primary bucket
pages. This enables prefetching of upcoming buckets while the current
one is being processed, improving I/O efficiency during hash index
vacuum operations.
---
 src/backend/access/hash/hash.c   | 80 ++++++++++++++++++++++++++++++--
 src/tools/pgindent/typedefs.list |  1 +
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index e388252afdc..01219c0015e 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -30,6 +30,7 @@
 #include "nodes/execnodes.h"
 #include "optimizer/plancat.h"
 #include "pgstat.h"
+#include "storage/read_stream.h"
 #include "utils/fmgrprotos.h"
 #include "utils/index_selfuncs.h"
 #include "utils/rel.h"
@@ -42,12 +43,23 @@ typedef struct
 	Relation	heapRel;		/* heap relation descriptor */
 } HashBuildState;
 
+/* Working state for streaming reads in hashbulkdelete */
+typedef struct
+{
+	HashMetaPage metap;			/* cached metapage for BUCKET_TO_BLKNO */
+	Bucket		next_bucket;	/* next bucket to prefetch */
+	Bucket		max_bucket;		/* stop when next_bucket > max_bucket */
+} HashBulkDeleteStreamPrivate;
+
 static void hashbuildCallback(Relation index,
 							  ItemPointer tid,
 							  Datum *values,
 							  bool *isnull,
 							  bool tupleIsAlive,
 							  void *state);
+static BlockNumber hash_bulkdelete_read_stream_cb(ReadStream *stream,
+												  void *callback_private_data,
+												  void *per_buffer_data);
 
 
 /*
@@ -450,6 +462,27 @@ hashendscan(IndexScanDesc scan)
 	scan->opaque = NULL;
 }
 
+/*
+ * Read stream callback for hashbulkdelete.
+ *
+ * Returns the block number of the primary page for the next bucket to
+ * vacuum, using the BUCKET_TO_BLKNO mapping from the cached metapage.
+ */
+static BlockNumber
+hash_bulkdelete_read_stream_cb(ReadStream *stream,
+							   void *callback_private_data,
+							   void *per_buffer_data)
+{
+	HashBulkDeleteStreamPrivate *p = callback_private_data;
+	Bucket		bucket;
+
+	if (p->next_bucket > p->max_bucket)
+		return InvalidBlockNumber;
+
+	bucket = p->next_bucket++;
+	return BUCKET_TO_BLKNO(p->metap, bucket);
+}
+
 /*
  * Bulk deletion of all index entries pointing to a set of heap tuples.
  * The set of target tuples is specified via a callback routine that tells
@@ -474,6 +507,8 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	Buffer		metabuf = InvalidBuffer;
 	HashMetaPage metap;
 	HashMetaPage cachedmetap;
+	HashBulkDeleteStreamPrivate stream_private;
+	ReadStream *stream = NULL;
 
 	tuples_removed = 0;
 	num_index_tuples = 0;
@@ -494,7 +529,25 @@ hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
 	cur_bucket = 0;
 	cur_maxbucket = orig_maxbucket;
 
-loop_top:
+	/* Set up streaming read for primary bucket pages */
+	stream_private.metap = cachedmetap;
+	stream_private.next_bucket = cur_bucket;
+	stream_private.max_bucket = cur_maxbucket;
+
+	/*
+	 * It is safe to use batchmode as hash_bulkdelete_read_stream_cb takes no
+	 * locks.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+										READ_STREAM_USE_BATCHING,
+										info->strategy,
+										rel,
+										MAIN_FORKNUM,
+										hash_bulkdelete_read_stream_cb,
+										&stream_private,
+										0);
+
+bucket_loop:
 	while (cur_bucket <= cur_maxbucket)
 	{
 		BlockNumber bucket_blkno;
@@ -514,7 +567,8 @@ loop_top:
 		 * We need to acquire a cleanup lock on the primary bucket page to out
 		 * wait concurrent scans before deleting the dead tuples.
 		 */
-		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+		buf = read_stream_next_buffer(stream, NULL);
+		Assert(BufferIsValid(buf));
 		LockBufferForCleanup(buf);
 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
 
@@ -545,6 +599,16 @@ loop_top:
 			{
 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 				Assert(cachedmetap != NULL);
+
+				/*
+				 * Reset stream with updated metadata for remaining buckets.
+				 * The BUCKET_TO_BLKNO mapping depends on hashm_spares[],
+				 * which may have changed.
+				 */
+				stream_private.metap = cachedmetap;
+				stream_private.next_bucket = cur_bucket + 1;
+				stream_private.max_bucket = cur_maxbucket;
+				read_stream_reset(stream);
 			}
 		}
 
@@ -577,9 +641,19 @@ loop_top:
 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
 		Assert(cachedmetap != NULL);
 		cur_maxbucket = cachedmetap->hashm_maxbucket;
-		goto loop_top;
+
+		/* Reset stream to process additional buckets from split */
+		stream_private.metap = cachedmetap;
+		stream_private.next_bucket = cur_bucket;
+		stream_private.max_bucket = cur_maxbucket;
+		read_stream_reset(stream);
+		goto bucket_loop;
 	}
 
+	/* Stream should be exhausted since we processed all buckets */
+	Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+	read_stream_end(stream);
+
 	/* Okay, we're really done.  Update tuple count in metapage. */
 	START_CRIT_SECTION();
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index dc6fc28fcab..572be5598f2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1176,6 +1176,7 @@ HashAggBatch
 HashAggSpill
 HashAllocFunc
 HashBuildState
+HashBulkDeleteStreamPrivate
 HashCompareFunc
 HashCopyFunc
 HashIndexStat
-- 
2.51.0



  [application/octet-stream] v4-0003-Streamify-heap-bloat-estimation-scan.-Introduce-a.patch (6.4K, 7-v4-0003-Streamify-heap-bloat-estimation-scan.-Introduce-a.patch)
  download | inline diff:
From 815eddd3f1d65f5485bd68f30e9ee52349ecb4f3 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Sat, 27 Dec 2025 00:29:02 +0800
Subject: [PATCH v4 3/6] Streamify heap bloat estimation scan. Introduce a 
 read-stream callback to skip all-visible pages via VM/FSM lookup and 
 stream-read the rest, reducing page reads and improving pgstattuple_approx 
 execution time on large relations.

---
 contrib/pgstattuple/pgstatapprox.c | 126 ++++++++++++++++++++++-------
 src/tools/pgindent/typedefs.list   |   1 +
 2 files changed, 96 insertions(+), 31 deletions(-)

diff --git a/contrib/pgstattuple/pgstatapprox.c b/contrib/pgstattuple/pgstatapprox.c
index a59ff4e9d4f..9904094d767 100644
--- a/contrib/pgstattuple/pgstatapprox.c
+++ b/contrib/pgstattuple/pgstatapprox.c
@@ -23,6 +23,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/procarray.h"
+#include "storage/read_stream.h"
 
 PG_FUNCTION_INFO_V1(pgstattuple_approx);
 PG_FUNCTION_INFO_V1(pgstattuple_approx_v1_5);
@@ -45,6 +46,61 @@ typedef struct output_type
 
 #define NUM_OUTPUT_COLUMNS 10
 
+/*
+ * Struct for statapprox_heap read stream callback.
+ */
+typedef struct StatApproxReadStreamPrivate
+{
+	Relation	rel;
+	output_type *stat;
+	BlockNumber current_blocknum;
+	BlockNumber nblocks;
+	BlockNumber scanned;		/* count of pages actually read */
+	Buffer		vmbuffer;		/* for VM lookups */
+} StatApproxReadStreamPrivate;
+
+/*
+ * Read stream callback for statapprox_heap.
+ *
+ * This callback checks the visibility map for each block. If the block is
+ * all-visible, we can get the free space from the FSM without reading the
+ * actual page, and skip to the next block. Only blocks that are not
+ * all-visible are returned for actual reading.
+ */
+static BlockNumber
+statapprox_heap_read_stream_next(ReadStream *stream,
+								 void *callback_private_data,
+								 void *per_buffer_data)
+{
+	StatApproxReadStreamPrivate *p = callback_private_data;
+
+	while (p->current_blocknum < p->nblocks)
+	{
+		BlockNumber blkno = p->current_blocknum++;
+		Size		freespace;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * If the page has only visible tuples, then we can find out the free
+		 * space from the FSM and move on without reading the page.
+		 */
+		if (VM_ALL_VISIBLE(p->rel, blkno, &p->vmbuffer))
+		{
+			freespace = GetRecordedFreeSpace(p->rel, blkno);
+			p->stat->tuple_len += BLCKSZ - freespace;
+			p->stat->free_space += freespace;
+			continue;
+		}
+
+		/* This block needs to be read */
+		p->scanned++;
+		return blkno;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * This function takes an already open relation and scans its pages,
  * skipping those that have the corresponding visibility map bit set.
@@ -58,53 +114,58 @@ typedef struct output_type
 static void
 statapprox_heap(Relation rel, output_type *stat)
 {
-	BlockNumber scanned,
-				nblocks,
-				blkno;
-	Buffer		vmbuffer = InvalidBuffer;
+	BlockNumber nblocks;
 	BufferAccessStrategy bstrategy;
 	TransactionId OldestXmin;
+	StatApproxReadStreamPrivate p;
+	ReadStream *stream;
 
 	OldestXmin = GetOldestNonRemovableTransactionId(rel);
 	bstrategy = GetAccessStrategy(BAS_BULKREAD);
 
 	nblocks = RelationGetNumberOfBlocks(rel);
-	scanned = 0;
 
-	for (blkno = 0; blkno < nblocks; blkno++)
+	/* Initialize read stream private data */
+	p.rel = rel;
+	p.stat = stat;
+	p.current_blocknum = 0;
+	p.nblocks = nblocks;
+	p.scanned = 0;
+	p.vmbuffer = InvalidBuffer;
+
+	/*
+	 * Create the read stream. We don't use READ_STREAM_USE_BATCHING because
+	 * the callback accesses the visibility map which may need to read VM
+	 * pages. While this shouldn't cause deadlocks, we err on the side of
+	 * caution.
+	 */
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										bstrategy,
+										rel,
+										MAIN_FORKNUM,
+										statapprox_heap_read_stream_next,
+										&p,
+										0);
+
+	for (;;)
 	{
 		Buffer		buf;
 		Page		page;
 		OffsetNumber offnum,
 					maxoff;
-		Size		freespace;
-
-		CHECK_FOR_INTERRUPTS();
-
-		/*
-		 * If the page has only visible tuples, then we can find out the free
-		 * space from the FSM and move on.
-		 */
-		if (VM_ALL_VISIBLE(rel, blkno, &vmbuffer))
-		{
-			freespace = GetRecordedFreeSpace(rel, blkno);
-			stat->tuple_len += BLCKSZ - freespace;
-			stat->free_space += freespace;
-			continue;
-		}
+		BlockNumber blkno;
 
-		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno,
-								 RBM_NORMAL, bstrategy);
+		buf = read_stream_next_buffer(stream, NULL);
+		if (buf == InvalidBuffer)
+			break;
 
 		LockBuffer(buf, BUFFER_LOCK_SHARE);
 
 		page = BufferGetPage(buf);
+		blkno = BufferGetBlockNumber(buf);
 
 		stat->free_space += PageGetExactFreeSpace(page);
 
-		/* We may count the page as scanned even if it's new/empty */
-		scanned++;
-
 		if (PageIsNew(page) || PageIsEmpty(page))
 		{
 			UnlockReleaseBuffer(buf);
@@ -169,6 +230,9 @@ statapprox_heap(Relation rel, output_type *stat)
 		UnlockReleaseBuffer(buf);
 	}
 
+	Assert(p.current_blocknum == nblocks);
+	read_stream_end(stream);
+
 	stat->table_len = (uint64) nblocks * BLCKSZ;
 
 	/*
@@ -179,7 +243,7 @@ statapprox_heap(Relation rel, output_type *stat)
 	 * tuples in all-visible pages, so no correction is needed for that, and
 	 * we already accounted for the space in those pages, too.
 	 */
-	stat->tuple_count = vac_estimate_reltuples(rel, nblocks, scanned,
+	stat->tuple_count = vac_estimate_reltuples(rel, nblocks, p.scanned,
 											   stat->tuple_count);
 
 	/* It's not clear if we could get -1 here, but be safe. */
@@ -190,16 +254,16 @@ statapprox_heap(Relation rel, output_type *stat)
 	 */
 	if (nblocks != 0)
 	{
-		stat->scanned_percent = 100.0 * scanned / nblocks;
+		stat->scanned_percent = 100.0 * p.scanned / nblocks;
 		stat->tuple_percent = 100.0 * stat->tuple_len / stat->table_len;
 		stat->dead_tuple_percent = 100.0 * stat->dead_tuple_len / stat->table_len;
 		stat->free_percent = 100.0 * stat->free_space / stat->table_len;
 	}
 
-	if (BufferIsValid(vmbuffer))
+	if (BufferIsValid(p.vmbuffer))
 	{
-		ReleaseBuffer(vmbuffer);
-		vmbuffer = InvalidBuffer;
+		ReleaseBuffer(p.vmbuffer);
+		p.vmbuffer = InvalidBuffer;
 	}
 }
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5c88fa92f4e..dc6fc28fcab 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2898,6 +2898,7 @@ StartReplicationCmd
 StartupStatusEnum
 StatEntry
 StatExtEntry
+StatApproxReadStreamPrivate
 StateFileChunk
 StatisticExtInfo
 StatsBuildData
-- 
2.51.0



view thread (35+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Streamify more code paths
  In-Reply-To: <CABPTF7W-f_zPN442FCp4Xaopi721oDmGYimq=VhAk=F7jwYZDQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox