From fd557f9d68b57070f7ab01cee8e60960d309a82f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Wed, 30 Oct 2024 22:41:14 +0100
Subject: [PATCH v20241106 7/7] WIP: stream read API

---
 src/backend/access/gist/gistget.c        |   7 -
 src/backend/access/hash/hashsearch.c     |  10 -
 src/backend/access/heap/heapam_handler.c |  32 +-
 src/backend/access/index/indexam.c       | 714 +++++++++++++++--------
 src/backend/access/nbtree/nbtree.c       |  18 +-
 src/backend/access/nbtree/nbtsearch.c    |  10 -
 src/backend/access/spgist/spgscan.c      |   7 -
 src/backend/access/table/tableam.c       |   2 +-
 src/backend/commands/constraint.c        |   3 +-
 src/backend/executor/nodeIndexonlyscan.c |   8 +-
 src/include/access/genam.h               |   1 +
 src/include/access/relscan.h             |  32 +-
 src/include/access/tableam.h             |   7 +-
 13 files changed, 542 insertions(+), 309 deletions(-)

diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 70e32f19366..015e0954af5 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -1084,13 +1084,6 @@ _gist_copy_batch(IndexScanDesc scan, GISTScanOpaque so,
 		start++;
 	}
 
-	/*
-	 * set the starting point
-	 *
-	 * XXX might be better done in indexam.c
-	 */
-	scan->xs_batch->currIndex = -1;
-
 	/* shouldn't be possible to end here with an empty batch */
 	Assert(scan->xs_batch->nheaptids > 0);
 }
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index c11ae847a3b..5182457475d 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -983,16 +983,6 @@ _hash_copy_batch(IndexScanDesc scan, ScanDirection dir, HashScanOpaque so,
 		start++;
 	}
 
-	/*
-	 * set the starting point
-	 *
-	 * XXX might be better done in indexam.c
-	 */
-	if (ScanDirectionIsForward(dir))
-		scan->xs_batch->currIndex = -1;
-	else
-		scan->xs_batch->currIndex = scan->xs_batch->nheaptids;
-
 	/* shouldn't be possible to end here with an empty batch */
 	Assert(scan->xs_batch->nheaptids > 0);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3fceae759d2..b90c799e457 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -75,11 +75,12 @@ heapam_slot_callbacks(Relation relation)
  */
 
 static IndexFetchTableData *
-heapam_index_fetch_begin(Relation rel)
+heapam_index_fetch_begin(Relation rel, ReadStream *rs)
 {
 	IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData));
 
 	hscan->xs_base.rel = rel;
+	hscan->xs_base.rs = rs;
 	hscan->xs_cbuf = InvalidBuffer;
 
 	return &hscan->xs_base;
@@ -126,15 +127,38 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 		/* Switch to correct buffer if we don't have it already */
 		Buffer		prev_buf = hscan->xs_cbuf;
 
-		hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
-											  hscan->xs_base.rel,
-											  ItemPointerGetBlockNumber(tid));
+		/*
+		 * XXX it's a bit weird just read buffers, expecting them to match
+		 * the TID we've put into the queue earlier from the callback.
+		 */
+		if (scan->rs)
+			hscan->xs_cbuf = read_stream_next_buffer(scan->rs, NULL);
+		else
+			hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
+												  hscan->xs_base.rel,
+												  ItemPointerGetBlockNumber(tid));
+
+		//elog(WARNING, "BufferGetBlockNumber(hscan->xs_cbuf) = %u", BufferGetBlockNumber(hscan->xs_cbuf));
+		//elog(WARNING, "ItemPointerGetBlockNumber(tid) = %u", ItemPointerGetBlockNumber(tid));
+
+		/* crosscheck: Did we get the expected block number? */
+		Assert(BufferIsValid(hscan->xs_cbuf));
+		Assert(BufferGetBlockNumber(hscan->xs_cbuf) == ItemPointerGetBlockNumber(tid));
 
 		/*
 		 * Prune page, but only if we weren't already on this page
 		 */
 		if (prev_buf != hscan->xs_cbuf)
+		{
 			heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf);
+		}
+
+		/* FIXME not sure this is really needed, or maybe this is not the
+		 * right place to do this */
+		if (scan->rs && (prev_buf != InvalidBuffer))
+		{
+			ReleaseBuffer(prev_buf);
+		}
 	}
 
 	/* Obtain share-lock on the buffer so we can examine visibility */
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index b117853f8d0..0a5a8a41abf 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -55,6 +55,7 @@
 #include "pgstat.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "storage/read_stream.h"
 #include "utils/memutils.h"
 #include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
@@ -121,8 +122,48 @@ static bool index_batch_getnext(IndexScanDesc scan,
 								ScanDirection direction);
 static ItemPointer index_batch_getnext_tid(IndexScanDesc scan,
 										   ScanDirection direction);
-static void index_batch_prefetch(IndexScanDesc scan,
-								 ScanDirection direction);
+
+static BlockNumber index_scan_stream_read_next(ReadStream *stream,
+											   void *callback_private_data,
+											   void *per_buffer_data);
+
+
+/* Is the batch full (TIDs up to capacity)? */
+#define	INDEX_BATCH_IS_FULL(scan)	\
+	((scan)->xs_batch->nheaptids == (scan)->xs_batch->currSize)
+
+/* Is the batch empty (no TIDs)? */
+#define	INDEX_BATCH_IS_EMPTY(scan)	\
+	((scan)->xs_batch->nheaptids == 0)
+
+/*
+ * Did we process all items? Or can we move to the next item in the requested
+ * direction? For forward scan this means the index points to the last item,
+ * for backward scans it has to point to the first one.
+ *
+ * This does not cover empty batches properly, because of backward scans.
+ */
+#define	INDEX_BATCH_IS_PROCESSED(scan, pos, direction)	\
+	(ScanDirectionIsForward(direction) ? \
+		((scan)->xs_batch->nheaptids == (pos)->index) : \
+		((pos)->index == -1))
+
+/*
+ * Does the batch have items in the requested direction? The batch must be
+ * non-empty, and we should not have reached the end of the batch (in the
+ * direction).
+ *
+ * However, if we just restored the position after mark/restore, there should
+ * be at least one item to process (as we won't advance on the next call).
+ *
+ * XXX This is a bit confusing / ugly, probably should rethink how we track
+ * empty batches, and how we handle not advancing after a restore.
+ */
+#define INDEX_BATCH_HAS_ITEMS(scan, pos, direction) \
+	(!INDEX_BATCH_IS_EMPTY(scan) && \
+		(!INDEX_BATCH_IS_PROCESSED((scan), (pos), (direction)) || \
+		(pos)->restored))
+
 
 /* ----------------------------------------------------------------
  *				   index_ interface functions
@@ -273,6 +314,7 @@ index_beginscan(Relation heapRelation,
 				int nkeys, int norderbys,
 				bool enable_batching)
 {
+	ReadStream   *rs = NULL;
 	IndexScanDesc scan;
 
 	Assert(snapshot != InvalidSnapshot);
@@ -286,9 +328,6 @@ index_beginscan(Relation heapRelation,
 	scan->heapRelation = heapRelation;
 	scan->xs_snapshot = snapshot;
 
-	/* prepare to fetch index matches from table */
-	scan->xs_heapfetch = table_index_fetch_begin(heapRelation);
-
 	/*
 	 * If explicitly requested and supported by both the index AM and the
 	 * plan, initialize batching info.
@@ -299,14 +338,31 @@ index_beginscan(Relation heapRelation,
 	 *
 	 * XXX Maybe we should have a separate "amcanbatch" call, to let the AM
 	 * decide if batching is supported depending on the scan details.
+	 *
+	 * XXX Do this before initializing xs_heapfetch. We only use stream read
+	 * API with batching enabled (so not with systable scans). But maybe we
+	 * should change that, and just use different callbacks (or something
+	 * like that)?
 	 */
 	if ((indexRelation->rd_indam->amgetbatch != NULL) &&
 		enable_batching &&
 		enable_indexscan_batching)
 	{
 		index_batch_init(scan);
+
+		/* initialize stream */
+		rs = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										heapRelation,
+										MAIN_FORKNUM,
+										index_scan_stream_read_next,
+										scan,
+										0);
 	}
 
+	/* prepare to fetch index matches from table */
+	scan->xs_heapfetch = table_index_fetch_begin(heapRelation, rs);
+
 	return scan;
 }
 
@@ -400,7 +456,12 @@ index_rescan(IndexScanDesc scan,
 
 	/* Release resources (like buffer pins) from table accesses */
 	if (scan->xs_heapfetch)
+	{
+		if (scan->xs_heapfetch->rs)
+			read_stream_reset(scan->xs_heapfetch->rs);
+
 		table_index_fetch_reset(scan->xs_heapfetch);
+	}
 
 	scan->kill_prior_tuple = false; /* for safety */
 	scan->xs_heap_continue = false;
@@ -424,6 +485,8 @@ index_rescan(IndexScanDesc scan,
 /* ----------------
  *		index_endscan - end a scan
  * ----------------
+ *
+ * FIXME should also release the index batch?
  */
 void
 index_endscan(IndexScanDesc scan)
@@ -434,6 +497,11 @@ index_endscan(IndexScanDesc scan)
 	/* Release resources (like buffer pins) from table accesses */
 	if (scan->xs_heapfetch)
 	{
+		if (scan->xs_heapfetch->rs)
+		{
+			read_stream_end(scan->xs_heapfetch->rs);
+		}
+
 		table_index_fetch_end(scan->xs_heapfetch);
 		scan->xs_heapfetch = NULL;
 	}
@@ -489,16 +557,35 @@ index_restrpos(IndexScanDesc scan)
 
 	/* release resources (like buffer pins) from table accesses */
 	if (scan->xs_heapfetch)
+	{
+		if (scan->xs_heapfetch->rs)
+			read_stream_reset(scan->xs_heapfetch->rs);
+
 		table_index_fetch_reset(scan->xs_heapfetch);
+	}
 
 	scan->kill_prior_tuple = false; /* for safety */
 	scan->xs_heap_continue = false;
 
+	/*
+	 * Reset the current/prefetch positions in the batch.
+	 *
+	 * XXX Done before calling amgetbatch(), so that it sees the indexes as
+	 * invalid, batch as empty, and can add items.
+	 *
+	 * XXX This is a bit weird/fragile.
+	 */
+	scan->xs_batch->readPos.index = -1;
+	scan->xs_batch->streamPos.index = -1;
+
+	/* XXX don't reset nheaptids here, it confused amrestrpos (which seems
+	 * a bit weird, it shouldn't be the case I think) */
+
 	scan->indexRelation->rd_indam->amrestrpos(scan);
 
 	/*
 	 * Don't reset the batch here - amrestrpos should have has already loaded
-	 * the new batch, so don't throw that away.
+	 * the new batch and set the curr/prefetch indexes, so don't throw that away.
 	 */
 }
 
@@ -581,7 +668,12 @@ index_parallelrescan(IndexScanDesc scan)
 	SCAN_CHECKS;
 
 	if (scan->xs_heapfetch)
+	{
+		if (scan->xs_heapfetch->rs)
+			read_stream_reset(scan->xs_heapfetch->rs);
+
 		table_index_fetch_reset(scan->xs_heapfetch);
+	}
 
 	/* amparallelrescan is optional; assume no-op if not provided by AM */
 	if (scan->indexRelation->rd_indam->amparallelrescan != NULL)
@@ -600,6 +692,7 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
 {
 	Snapshot	snapshot;
 	IndexScanDesc scan;
+	ReadStream   *rs = NULL;
 
 	Assert(RelFileLocatorEquals(heaprel->rd_locator, pscan->ps_locator));
 	Assert(RelFileLocatorEquals(indexrel->rd_locator, pscan->ps_indexlocator));
@@ -616,9 +709,6 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
 	scan->heapRelation = heaprel;
 	scan->xs_snapshot = snapshot;
 
-	/* prepare to fetch index matches from table */
-	scan->xs_heapfetch = table_index_fetch_begin(heaprel);
-
 	/*
 	 * If explicitly requested and supported by both the index AM and the
 	 * plan, initialize batching info.
@@ -635,8 +725,20 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
 		enable_indexscan_batching)
 	{
 		index_batch_init(scan);
+
+		/* initialize stream */
+		rs = read_stream_begin_relation(READ_STREAM_DEFAULT,
+										NULL,
+										heaprel,
+										MAIN_FORKNUM,
+										index_scan_stream_read_next,
+										scan,
+										0);
 	}
 
+	/* prepare to fetch index matches from table */
+	scan->xs_heapfetch = table_index_fetch_begin(heaprel, rs);
+
 	return scan;
 }
 
@@ -676,31 +778,16 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 batch_loaded:
 		/* Try getting a TID from the current batch (if we have one). */
 		while (index_batch_getnext_tid(scan, direction) != NULL)
-		{
-			/*
-			 * We've successfully loaded a TID from the batch, so issue
-			 * prefetches for future TIDs if needed.
-			 */
-			index_batch_prefetch(scan, direction);
-
 			return &scan->xs_heaptid;
-		}
 
 		/*
 		 * We either don't have any batch yet, or we've already processed
 		 * all items from the current batch. Try loading the next one.
 		 *
-		 * If we succeed, issue prefetches (using the current prefetch
-		 * distance without ramp up), and then go back to returning the
-		 * TIDs from the batch.
-		 *
 		 * XXX Maybe do this as a simple while/for loop without the goto.
 		 */
 		if (index_batch_getnext(scan, direction))
-		{
-			index_batch_prefetch(scan, direction);
 			goto batch_loaded;
-		}
 
 		return NULL;
 	}
@@ -722,7 +809,12 @@ batch_loaded:
 	{
 		/* release resources (like buffer pins) from table accesses */
 		if (scan->xs_heapfetch)
+		{
+			if (scan->xs_heapfetch->rs)
+				read_stream_reset(scan->xs_heapfetch->rs);
+
 			table_index_fetch_reset(scan->xs_heapfetch);
+		}
 
 		return NULL;
 	}
@@ -783,7 +875,7 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 			/* batch case - record the killed tuple in the batch */
 			if (scan->xs_batch->nKilledItems < scan->xs_batch->maxSize)
 				scan->xs_batch->killedItems[scan->xs_batch->nKilledItems++]
-					= scan->xs_batch->currIndex;
+					= scan->xs_batch->readPos.index;
 		}
 	}
 
@@ -1328,98 +1420,252 @@ AssertCheckBatchInfo(IndexScanDesc scan)
 	 * The current item must be between -1 and nheaptids. Those two extreme
 	 * values are starting points for forward/backward scans.
 	 */
-	Assert((scan->xs_batch->currIndex >= -1) &&
-		   (scan->xs_batch->currIndex <= scan->xs_batch->nheaptids));
-
-	/* check prefetch data */
-	Assert((scan->xs_batch->prefetchTarget >= 0) &&
-		   (scan->xs_batch->prefetchTarget <= scan->xs_batch->prefetchMaximum));
+	Assert((scan->xs_batch->readPos.index >= -1) &&
+		   (scan->xs_batch->readPos.index <= scan->xs_batch->nheaptids));
 
-	Assert((scan->xs_batch->prefetchIndex >= -1) &&
-		   (scan->xs_batch->prefetchIndex <= scan->xs_batch->nheaptids));
+	Assert((scan->xs_batch->streamPos.index >= -1) &&
+		   (scan->xs_batch->streamPos.index <= scan->xs_batch->nheaptids));
 
 	for (int i = 0; i < scan->xs_batch->nheaptids; i++)
 		Assert(ItemPointerIsValid(&scan->xs_batch->heaptids[i]));
 #endif
 }
 
-/* Is the batch full (TIDs up to capacity)? */
-#define	INDEX_BATCH_IS_FULL(scan)	\
-	((scan)->xs_batch->nheaptids == (scan)->xs_batch->currSize)
+/*
+ * index_batch_pos_advance
+ *		Advance the position to the next item, depending on scan direction.
+ *
+ * XXX We expect to only do this when the advance is possible/valid.
+ *
+ * XXX Would be nice to have an assert that the final position is valid,
+ * but that requires knowing nheaptids.
+ */
+static int
+index_batch_pos_advance(IndexScanBatchPos *pos, ScanDirection dir)
+{
+	if (pos->restored)
+	{
+		pos->restored = false;
+		return pos->index;
+	}
 
-/* Is the batch empty (no TIDs)? */
-#define	INDEX_BATCH_IS_EMPTY(scan)	\
-	((scan)->xs_batch->nheaptids == 0)
+	/*
+	 * We update the index "before" to move to the next item, because that
+	 * makes scan direction changes simpler to handle (we don't need to undo
+	 * the last update).
+	 */
+	if (ScanDirectionIsForward(dir))
+		return ++pos->index;
+	else
+		return --pos->index;
+}
 
 /*
- * Did we process all items? For forward scan it means the index points to the
- * last item, for backward scans it has to point to the first one.
+ * index_batch_pos_reset
+ *		Reset the position to the index, maybe disable update on first advance.
  *
- * This does not cover empty batches properly, because of backward scans.
+ * Index specifies the item index. The 'restored' flag may be used to disable
+ * updating the index on the first advance. This is needed after mark/restore,
+ * because we restore to the item we're expected to return next, so we must
+ * not skip it.
+ *
+ * XXX Would be nice to have an assert that the final position is valid,
+ * but that requires knowing nheaptids.
  */
-#define	INDEX_BATCH_IS_PROCESSED(scan, direction)	\
-	(ScanDirectionIsForward(direction) ? \
-		((scan)->xs_batch->nheaptids == ((scan)->xs_batch->currIndex + 1)) : \
-		((scan)->xs_batch->currIndex == 0))
+static void
+index_batch_pos_reset(IndexScanBatchPos *pos, int index, bool restored)
+{
+	pos->index = index;
+	pos->restored = restored;
+}
 
 /*
- * Does the batch items in the requested direction? The batch must be non-empty
- * and we should not have reached the end of the batch (in the direction).
- * Also, if we just restored the position after mark/restore, there should be
- * at least one item to process (we won't advance  on the next call).
+ * index_batch_reset_positions
+ *		Reset both bach positions - both the processing and stream
  *
- * XXX This is a bit confusing / ugly, probably should rethink how we track
- * empty batches, and how we handle not advancing after a restore.
+ * XXX Would be nice to check the stream position is always "ahead" of the
+ * read position. That requires knowing the direction.
+ */
+void
+index_batch_reset_positions(IndexScanDesc scan, int index, bool restored)
+{
+	index_batch_pos_reset(&scan->xs_batch->readPos, index, restored);
+	index_batch_pos_reset(&scan->xs_batch->streamPos, index, restored);
+}
+
+/*
+ * index_batch_reset_indexes
+ *		reset batch indexes (read/stream) for a given scan direction
+ *
+ * After loading a new batch, we need to set both indexes, so that we
+ * advance to the first item on the next call. This depends on the scan
+ * direction - for forward scans we want to proceed to the first item
+ * (with index 0), so we set -1. For backward scans we want to proceed
+ * to the last item (with index (nheaptids-1)), so we set nheaptids.
+ *
+ * XXX It's legal to call lthis with resetIndexes=false, but in that case
+ * the function does nothing.
  */
-#define INDEX_BATCH_HAS_ITEMS(scan, direction) \
-	(!INDEX_BATCH_IS_EMPTY(scan) && (!INDEX_BATCH_IS_PROCESSED(scan, direction) || scan->xs_batch->restored))
+static void
+index_batch_reset_indexes(IndexScanDesc scan)
+{
+	/* bail out if no index needed */
+	if (!scan->xs_batch->resetIndexes)
+		return;
 
+	/*
+	 * Reset the read stream too, if we have one.
+	 *
+	 * XXX Not sure if this is the right place to do this. It does not hurt,
+	 * we really want to start with a new stream. But maybe it'd be more
+	 * logical to reset the stream from the caller, not from here.
+	 */
+	if (scan->xs_heapfetch)
+	{
+		if (scan->xs_heapfetch->rs)
+			read_stream_reset(scan->xs_heapfetch->rs);
+	}
 
-/* ----------------
- *		index_batch_getnext - get the next batch of TIDs from a scan
+	/* set indexes into "starting" position depending on scan direction */
+	if (ScanDirectionIsForward(scan->xs_batch->dir))
+	{
+		index_batch_pos_reset(&scan->xs_batch->readPos, -1, false);
+		index_batch_pos_reset(&scan->xs_batch->streamPos, -1, false);
+	}
+	else
+	{
+		index_batch_pos_reset(&scan->xs_batch->readPos,
+							  scan->xs_batch->nheaptids, false);
+		index_batch_pos_reset(&scan->xs_batch->streamPos,
+							  scan->xs_batch->nheaptids, false);
+	}
+
+	scan->xs_batch->resetIndexes = false;
+}
+
+/*
+ * index_scan_stream_read_next
+ *		return the next block to pass to the read stream
  *
- * Returns true if we managed to read at least some TIDs into the batch,
- * or false if there are no more TIDs in the scan. The xs_heaptids and
- * xs_nheaptids fields contain the TIDS and the number of elements.
+ * This assumes the "current" scan direction, requested by the caller. If
+ * that changes before consuming all buffers, we'll reset the stream and
+ * start from scratch. Which may seem inefficient, but it's no worse than
+ * what we do now, and it's not a very common case.
  *
- * XXX This only loads the TIDs and resets the various batch fields to
- * fresh state. It does not set xs_heaptid/xs_itup/xs_hitup, that's the
- * responsibility of the following index_batch_getnext_tid() calls.
- * ----------------
+ * The scan direction change is checked / handled elsewhere.
  */
-static bool
-index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
+static BlockNumber
+index_scan_stream_read_next(ReadStream *stream,
+							void *callback_private_data,
+							void *per_buffer_data)
 {
-	bool		found;
+	int				index;
+	IndexScanDesc	scan = (IndexScanDesc) callback_private_data;
 
-	SCAN_CHECKS;
-	CHECK_SCAN_PROCEDURE(amgetbatch);
+	/*
+	 * XXX What should we do without batching? Currently we should not get
+	 * here without batching, because we fall back to the regular buffer
+	 * reads (without stream read API) in that case. But is that what we
+	 * want to do in the future?
+	 *
+	 * Maybe the right solution would be to just use read stream API, but
+	 * only read the one block ahead, because we can't know more than that.
+	 */
+	Assert(scan->xs_batch);
 
-	/* XXX: we should assert that a snapshot is pushed or registered */
-	Assert(TransactionIdIsValid(RecentXmin));
+	if (!scan->xs_batch)	/* can't happen right now */
+		elog(ERROR, "index_scan_stream_read_next calleld without batching");
 
-	/* comprehensive checks of batching info */
-	AssertCheckBatchInfo(scan);
+	/*
+	 * We shouldn't ever get here with an empty batch. In that case we bail
+	 * out after index_batch_getnext(), if it finds nothing.
+	 */
+	Assert(!INDEX_BATCH_IS_EMPTY(scan));
 
 	/*
-	 * We never read a new batch before we run out of items in the current
-	 * one. The current batch has to be either empty or we ran out of items
-	 * (in the given direction).
+	 * It shouldn't be possible to have both the resetIndexes and restored
+	 * flags set at the same time.
+	 *
+	 * XXX Add this check to AssertCheckBatchInfo().
 	 */
-	Assert(!INDEX_BATCH_HAS_ITEMS(scan, direction));
+	Assert(!(scan->xs_batch->resetIndexes && scan->xs_batch->readPos.restored));
 
+	/*
+	 * Maybe reset indexes, if requested.
+	 *
+	 * XXX Shouldn't this happen right after loading the next batch, or after
+	 * checking the scan direction? Then we should not need to care about
+	 * resetIndexes here.
+	 */
+	index_batch_reset_indexes(scan);
+
+	/*
+	 * Find the next block to pass to the read stream. With the callback we
+	 * may end up skipping some of the items, so do a loop until we find one
+	 * or until we run out of items.
+	 *
+	 * XXX It's a bit unfortunate that we may need to visit many items (if
+	 * the callback returns false for many before), as that may be expensive.
+	 * For example for IOS we need to check the visibility map, which is not
+	 * free (although not too expensive either). But that's one of the resons
+	 * why we grow the batch size gradually.
+	 *
+	 * XXX Do the while loop based on "has items", so that we can do
+	 * AssertCheckValidItem() on the result.
+	 */
+	while (true)
+	{
+		/* Advance the stream index as needed. */
+		index = index_batch_pos_advance(&scan->xs_batch->streamPos,
+										scan->xs_batch->dir);
+
+		/* Did we run out of items in the current batch? */
+		if ((index < 0) || (index >= scan->xs_batch->nheaptids))
+			return InvalidBlockNumber;
+
+		/*
+		 * If defined, invoke the prefetch callback to determine if we should
+		 * actually pass this block to the read stream (e.g. to prefetch).
+		 * For example index-only scans use this to skip all-visible pages.
+		 */
+		if (scan->xs_batch->prefetchCallback &&
+			!scan->xs_batch->prefetchCallback(scan,
+											  scan->xs_batch->prefetchArgument,
+											  index))
+		{
+			/*
+			 * We don't need to prefetch this item, but we need to return
+			 * something valid (otherwise the stream will think we're at
+			 * the end). So we keep looking for the next item in the batch.
+			 */
+			continue;
+		}
+
+		break;
+	}
+
+	/*
+	 * FIXME Maybe we should store the TID into the per_buffer_data, so that
+	 * we can cross check we got the right buffer later? It seems quite
+	 * fragile and easy to break the exact sequence in some way. We could
+	 * return a different block which happens to have the right item IDs.
+	 */
+
+	return ItemPointerGetBlockNumber(&scan->xs_batch->heaptids[index]);
+}
+
+/* XXX isn't this pretty much the same as index_batch_reset? */
+static void
+index_batch_empty(IndexScanDesc scan)
+{
 	/*
 	 * Reset the current/prefetch positions in the batch.
 	 *
 	 * XXX Done before calling amgetbatch(), so that it sees the index as
 	 * invalid, batch as empty, and can add items.
-	 *
-	 * XXX Intentionally does not reset the nheaptids, because the AM does
-	 * rely on that when processing killed tuples. Maybe store the killed
-	 * tuples differently?
 	 */
-	scan->xs_batch->currIndex = -1;
-	scan->xs_batch->prefetchIndex = 0;
+	scan->xs_batch->readPos.index = -1;
+	scan->xs_batch->streamPos.index = -1;
 	scan->xs_batch->nheaptids = 0;
 
 	/*
@@ -1438,6 +1684,49 @@ index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
 	scan->xs_hitup = NULL;
 
 	MemoryContextReset(scan->xs_batch->ctx);
+}
+
+/* ----------------
+ *		index_batch_getnext - get the next batch of TIDs from a scan
+ *
+ * Returns true if we managed to read at least some TIDs into the batch,
+ * or false if there are no more TIDs in the scan. The xs_heaptids and
+ * xs_nheaptids fields contain the TIDS and the number of elements.
+ *
+ * XXX This only loads the TIDs and resets the various batch fields to
+ * fresh state. It does not set xs_heaptid/xs_itup/xs_hitup, that's the
+ * responsibility of the following index_batch_getnext_tid() calls.
+ * ----------------
+ */
+static bool
+index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
+{
+	bool		found;
+
+	SCAN_CHECKS;
+	CHECK_SCAN_PROCEDURE(amgetbatch);
+
+	/* XXX: we should assert that a snapshot is pushed or registered */
+	Assert(TransactionIdIsValid(RecentXmin));
+
+	/* comprehensive checks of batching info */
+	AssertCheckBatchInfo(scan);
+
+	/*
+	 * We never read a new batch before we run out of items in the current
+	 * one. The current batch has to be either empty or we ran out of items
+	 * (in the given direction).
+	 *
+	 * XXX We may abandon a batch because of a rescan, but that counts as
+	 * a new scan (and we reset the batch anyway).
+	 */
+	Assert(!INDEX_BATCH_HAS_ITEMS(scan, &scan->xs_batch->readPos, direction));
+
+	/*
+	 * Reset the batch info empty state, before calling amgetbatch(), so that
+	 * the index AM sees it as ready for new data.
+	 */
+	index_batch_empty(scan);
 
 	/*
 	 * The AM's amgetbatch proc loads a chunk of TIDs matching the scan keys,
@@ -1454,33 +1743,44 @@ index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
 	scan->kill_prior_tuple = false;
 	scan->xs_heap_continue = false;
 
-	/* If we're out of index entries, we're done */
+	/*
+	 * If we're out of index entries, we're done
+	 *
+	 * XXX Is this the right place to release resources unrelated to the
+	 * batching? Maybe that should happen sometime higher / in the caller?
+	 */
 	if (!found)
 	{
 		/* release resources (like buffer pins) from table accesses */
 		if (scan->xs_heapfetch)
+		{
+			if (scan->xs_heapfetch->rs)
+				read_stream_reset(scan->xs_heapfetch->rs);
+
 			table_index_fetch_reset(scan->xs_heapfetch);
+		}
 
 		return false;
 	}
 
+	/* reset positions */
+	if (ScanDirectionIsForward(direction))
+		index_batch_reset_positions(scan, -1, false);
+	else
+		index_batch_reset_positions(scan, scan->xs_batch->nheaptids, false);
+
 	/* We should have a non-empty batch with items. */
-	Assert(INDEX_BATCH_HAS_ITEMS(scan, direction));
+	Assert(INDEX_BATCH_HAS_ITEMS(scan, &scan->xs_batch->readPos, direction));
 
 	pgstat_count_index_tuples(scan->indexRelation, scan->xs_batch->nheaptids);
 
 	/*
-	 * Set the prefetch index to the first item in the loaded batch (we expect
-	 * the index AM to set that).
-	 *
-	 * FIXME Maybe set the currIndex here, not in the index AM. It seems much
-	 * more like indexam.c responsibility rather than something every index AM
-	 * should be doing (in _bt_first_batch etc.).
+	 * Remember the direction.
 	 *
-	 * FIXME It's a bit unclear who (indexam.c or the index AM) is responsible
-	 * for setting which fields. This needs clarification.
+	 * FIXME also check it later, it may happen to change for each call
 	 */
-	scan->xs_batch->prefetchIndex = scan->xs_batch->currIndex;
+	scan->xs_batch->dir = direction;
+	scan->xs_batch->resetIndexes = true;
 
 	/*
 	 * Try to increase the size of the batch. Intentionally done after the AM
@@ -1493,6 +1793,23 @@ index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
 	/* comprehensive checks of batching info */
 	AssertCheckBatchInfo(scan);
 
+	/* reset the read stream so that we read the next batch */
+	read_stream_reset(scan->xs_heapfetch->rs);
+
+	/*
+	 * Release resources (like buffer pins) from table accesses
+	 *
+	 * XXX Not sure this is needed, I might have added it while chasing
+	 * some resource leaks.
+	 */
+	if (scan->xs_heapfetch)
+	{
+		if (scan->xs_heapfetch->rs)
+			read_stream_reset(scan->xs_heapfetch->rs);
+
+		table_index_fetch_reset(scan->xs_heapfetch);
+	}
+
 	/* Return the batch of TIDs we found. */
 	return true;
 }
@@ -1512,33 +1829,50 @@ index_batch_getnext(IndexScanDesc scan, ScanDirection direction)
 static ItemPointer
 index_batch_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 {
+	int		index;
+
+	/* shouldn't get here without batching */
+	Assert(scan->xs_batch);
+
 	/* comprehensive checks of batching info */
 	AssertCheckBatchInfo(scan);
 
-	/*
-	 * Bail out if he batch does not have more items in the requested directio
-	 * (either empty or everthing processed).
-	 */
-	if (!INDEX_BATCH_HAS_ITEMS(scan, direction))
-		return NULL;
+	/* We should have handled change of scan direction sometime earlier. */
+	if (scan->xs_batch->dir != direction)
+	{
+		scan->xs_batch->dir = direction;
+		scan->xs_batch->streamPos.index = scan->xs_batch->readPos.index;
+
+		if (scan->xs_heapfetch)
+		{
+			if (scan->xs_heapfetch->rs)
+				read_stream_reset(scan->xs_heapfetch->rs);
+		}
+	}
+
+	/* reset indexes if needed */
+	index_batch_reset_indexes(scan);
+
+	Assert(scan->xs_batch->dir == direction);
 
 	/*
-	 * Advance to the next batch item - we know it's not empty and there are
-	 * items to process, so this is valid.
+	 * Don't continue/advance indexes, if the batch is empty, otherwise
+	 * we'd advance to bogus index values e.g. after changing the scan
+	 * direction.
 	 *
-	 * However, don't advance if this is the first getnext_tid() call after
-	 * amrestrpos(). That sets the position on the correct item, and advancing
-	 * here would skip it.
-	 *
-	 * XXX The "restored" flag is a bit weird. Can we do this without it? May
-	 * need to rethink when/how we advance the batch index. Not sure.
+	 * FIXME Not sure this is the right place to do this decision. It seems
+	 * weird we reset the direction first, etc. and only then realize the
+	 * batch is actually empty.
 	 */
-	if (scan->xs_batch->restored)
-		scan->xs_batch->restored = false;
-	else if (ScanDirectionIsForward(direction))
-		scan->xs_batch->currIndex++;
-	else
-		scan->xs_batch->currIndex--;
+	if (scan->xs_batch->nheaptids == 0)
+		return NULL;
+
+	/* Advance the index to the item we need to in the next round. */
+	index = index_batch_pos_advance(&scan->xs_batch->readPos, direction);
+
+	/* Did we run out of items in the current batch? */
+	if ((index < 0) || (index >= scan->xs_batch->nheaptids))
+		return NULL;
 
 	/*
 	 * Next TID from the batch, optionally also the IndexTuple/HeapTuple.
@@ -1549,14 +1883,14 @@ index_batch_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 	 * XXX Do we need to reset the itups/htups array between batches? Doesn't
 	 * seem necessary, but maybe we could get bogus data?
 	 */
-	scan->xs_heaptid = scan->xs_batch->heaptids[scan->xs_batch->currIndex];
+	scan->xs_heaptid = scan->xs_batch->heaptids[index];
 	if (scan->xs_want_itup)
 	{
-		scan->xs_itup = scan->xs_batch->itups[scan->xs_batch->currIndex];
-		scan->xs_hitup = scan->xs_batch->htups[scan->xs_batch->currIndex];
+		scan->xs_itup = scan->xs_batch->itups[index];
+		scan->xs_hitup = scan->xs_batch->htups[index];
 	}
 
-	scan->xs_recheck = scan->xs_batch->recheck[scan->xs_batch->currIndex];
+	scan->xs_recheck = scan->xs_batch->recheck[index];
 
 	/*
 	 * If there are order-by clauses, point to the appropriate chunk in the
@@ -1564,7 +1898,7 @@ index_batch_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 	 */
 	if (scan->numberOfOrderBys > 0)
 	{
-		int			idx = scan->numberOfOrderBys * scan->xs_batch->currIndex;
+		int			idx = scan->numberOfOrderBys * index;
 
 		scan->xs_orderbyvals = &scan->xs_batch->orderbyvals[idx];
 		scan->xs_orderbynulls = &scan->xs_batch->orderbynulls[idx];
@@ -1576,120 +1910,6 @@ index_batch_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 	return &scan->xs_heaptid;
 }
 
-/* ----------------
- *		index_batch_prefetch - prefetch pages for TIDs in current batch
- *
- * The prefetch distance is increased gradually, similar to what we do for
- * bitmap heap scans. We start from distance 0 (no prefetch), and then in each
- * iteration increment the distance up to prefetchMaximum.
- *
- * The prefetch distance is reset (to 0) only on rescans, not between batches.
- *
- * It's possible to provide an index_prefetch_callback callback, to affect
- * which items need to be prefetched. With prefetch_callback=NULL, all
- * items are prefetched. With the callback provided, the item is prefetched
- * iff the callback and returns true.
- *
- * The "arg" argument is used to pass a state for the plan node invoking the
- * function, and is then passed to the callback. This means the callback is
- * specific to the plan state.
- *
- * XXX the prefetchMaximum depends on effective_io_concurrency, and also on
- * tablespace options.
- *
- * XXX For accesses that change scan direction, we may do a lot of unnecessary
- * prefetching (because we will re-issue prefetches for what we recently read).
- * I'm not sure if there's a simple way to track what was already prefetched.
- * Maybe we could count how far we got (in the forward direction), keep that
- * as a watermark, and never prefetch again below it.
- *
- * XXX Maybe wrap this in ifdef USE_PREFETCH?
- * ----------------
- */
-static void
-index_batch_prefetch(IndexScanDesc scan, ScanDirection direction)
-{
-	int			prefetchStart,
-				prefetchEnd;
-
-	IndexPrefetchCallback	prefetch_callback = scan->xs_batch->prefetchCallback;
-	void *arg = scan->xs_batch->prefetchArgument;
-
-	if (ScanDirectionIsForward(direction))
-	{
-		/* Where should we start to prefetch? */
-		prefetchStart = Max(scan->xs_batch->currIndex,
-							scan->xs_batch->prefetchIndex);
-
-		/*
-		 * Where should we stop prefetching? this is the first item that we do
-		 * NOT prefetch, i.e. it can be the first item after the batch.
-		 */
-		prefetchEnd = Min((scan->xs_batch->currIndex + 1) + scan->xs_batch->prefetchTarget,
-						  scan->xs_batch->nheaptids);
-
-		/* FIXME should calculate in a way to make this unnecessary */
-		prefetchStart = Max(Min(prefetchStart, scan->xs_batch->nheaptids - 1), 0);
-		prefetchEnd = Max(Min(prefetchEnd, scan->xs_batch->nheaptids - 1), 0);
-
-		/* remember how far we prefetched / where to start the next prefetch */
-		scan->xs_batch->prefetchIndex = prefetchEnd;
-	}
-	else
-	{
-		/* Where should we start to prefetch? */
-		prefetchEnd = Min(scan->xs_batch->currIndex,
-						  scan->xs_batch->prefetchIndex);
-
-		/*
-		 * Where should we stop prefetching? this is the first item that we do
-		 * NOT prefetch, i.e. it can be the first item after the batch.
-		 */
-		prefetchStart = Max((scan->xs_batch->currIndex - 1) - scan->xs_batch->prefetchTarget,
-							-1);
-
-		/* FIXME should calculate in a way to make this unnecessary */
-		prefetchStart = Max(Min(prefetchStart, scan->xs_batch->nheaptids - 1), 0);
-		prefetchEnd = Max(Min(prefetchEnd, scan->xs_batch->nheaptids - 1), 0);
-
-		/* remember how far we prefetched / where to start the next prefetch */
-		scan->xs_batch->prefetchIndex = prefetchStart;
-	}
-
-	/*
-	 * It's possible we get inverted prefetch range after a restrpos() call,
-	 * because we intentionally don't reset the prefetchIndex - we don't want
-	 * to prefetch pages over and over in this case. We'll do nothing in that
-	 * case, except for the AssertCheckBatchInfo().
-	 *
-	 * FIXME I suspect this actually does not work correctly if we change the
-	 * direction, because the prefetchIndex will flip between two extremes
-	 * thanks to the Min/Max.
-	 */
-
-	/*
-	 * Increase the prefetch distance, but not beyond prefetchMaximum. We
-	 * intentionally do this after calculating start/end, so that we start
-	 * actually prefetching only after the first item.
-	 */
-	scan->xs_batch->prefetchTarget = Min(scan->xs_batch->prefetchTarget + 1,
-										 scan->xs_batch->prefetchMaximum);
-
-	/* comprehensive checks of batching info */
-	AssertCheckBatchInfo(scan);
-
-	/* finally, do the actual prefetching */
-	for (int i = prefetchStart; i < prefetchEnd; i++)
-	{
-		/* skip block if the provided callback says so */
-		if (prefetch_callback && !prefetch_callback(scan, arg, i))
-			continue;
-
-		PrefetchBuffer(scan->heapRelation, MAIN_FORKNUM,
-					   ItemPointerGetBlockNumber(&scan->xs_batch->heaptids[i]));
-	}
-}
-
 /*
  * index_batch_init
  *		Initialize various fields / arrays needed by batching.
@@ -1715,14 +1935,21 @@ index_batch_init(IndexScanDesc scan)
 	scan->xs_batch->initSize = 8;
 	scan->xs_batch->currSize = scan->xs_batch->initSize;
 
-	/* initialize prefetching info */
-	scan->xs_batch->prefetchMaximum =
-		get_tablespace_io_concurrency(scan->heapRelation->rd_rel->reltablespace);
-	scan->xs_batch->prefetchTarget = 0;
-	scan->xs_batch->prefetchIndex = 0;
+	/* initialize prefetching info to some bogus values */
+	index_batch_pos_reset(&scan->xs_batch->readPos, -1, false);
+	index_batch_pos_reset(&scan->xs_batch->streamPos, -1, false);
 
-	/* */
-	scan->xs_batch->currIndex = -1;
+	/*
+	 * Make sure to reset prefetch/current indexes before using them later.
+	 * We can't do that now because we don't know the direction.
+	 */
+	scan->xs_batch->resetIndexes = true;
+
+	/*
+	 * FIXME set later, when we actually know it. Or shall we assume most
+	 * scans are (or at lest start) forward?
+	 */
+	scan->xs_batch->dir = ForwardScanDirection;
 
 	/* Preallocate the largest allowed array of TIDs. */
 	scan->xs_batch->nheaptids = 0;
@@ -1780,6 +2007,9 @@ index_batch_init(IndexScanDesc scan)
  *
  * FIXME Another bit in need of cleanup. The currIndex default (-1) is not quite
  * correct, because for backwards scans is wrong.
+ *
+ * XXX Isn't this the same as index_batch_empty?
+ * XXX Should this reset the batch memory context, xs_itup, xs_hitup, etc?
  */
 static void
 index_batch_reset(IndexScanDesc scan)
@@ -1789,9 +2019,9 @@ index_batch_reset(IndexScanDesc scan)
 		return;
 
 	scan->xs_batch->nheaptids = 0;
-	scan->xs_batch->prefetchIndex = 0;
-	scan->xs_batch->currIndex = -1;
-	scan->xs_batch->restored = false;
+	index_batch_pos_reset(&scan->xs_batch->readPos, -1, false);
+	index_batch_pos_reset(&scan->xs_batch->streamPos, -1, false);
+	scan->xs_batch->resetIndexes = true;
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index db0e22e0ce2..7082e7bd381 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -532,10 +532,10 @@ btmarkpos(IndexScanDesc scan)
 	if (scan->xs_batch)
 	{
 		/* the index should be valid in the batch */
-		Assert(scan->xs_batch->currIndex >= 0);
-		Assert(scan->xs_batch->currIndex < scan->xs_batch->nheaptids);
+		Assert(scan->xs_batch->readPos.index >= 0);
+		Assert(scan->xs_batch->readPos.index < scan->xs_batch->nheaptids);
 
-		so->currPos.itemIndex = so->batch.firstIndex + scan->xs_batch->currIndex;
+		so->currPos.itemIndex = so->batch.firstIndex + scan->xs_batch->readPos.index;
 	}
 
 	/*
@@ -602,7 +602,6 @@ btrestrpos(IndexScanDesc scan)
 
 				/* make it look empty */
 				scan->xs_batch->nheaptids = 0;
-				scan->xs_batch->prefetchIndex = -1;
 
 				/*
 				 * XXX the scan direction is bogus / not important. It affects
@@ -621,8 +620,9 @@ btrestrpos(IndexScanDesc scan)
 			 * XXX This is a bit weird. There should be a way to not need the
 			 * "restored" flag I think.
 			 */
-			scan->xs_batch->currIndex = (so->currPos.itemIndex - so->batch.firstIndex);
-			scan->xs_batch->restored = true;
+			index_batch_reset_positions(scan,
+										(so->currPos.itemIndex - so->batch.firstIndex),
+										true);
 		}
 	}
 	else
@@ -683,7 +683,6 @@ btrestrpos(IndexScanDesc scan)
 
 				/* make it look empty */
 				scan->xs_batch->nheaptids = 0;
-				scan->xs_batch->prefetchIndex = -1;
 
 				/* XXX the scan direction is bogus */
 				_bt_copy_batch(scan, ForwardScanDirection, so, start, end);
@@ -697,8 +696,9 @@ btrestrpos(IndexScanDesc scan)
 				 * XXX This is a bit weird. There should be a way to not need
 				 * the "restored" flag I think.
 				 */
-				scan->xs_batch->currIndex = (so->currPos.itemIndex - so->batch.firstIndex);
-				scan->xs_batch->restored = true;
+				index_batch_reset_positions(scan,
+											(so->currPos.itemIndex - so->batch.firstIndex),
+											true);
 			}
 		}
 		else
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 22181bf027b..1ae164a94b2 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -1565,16 +1565,6 @@ _bt_copy_batch(IndexScanDesc scan, ScanDirection dir, BTScanOpaque so,
 		start++;
 	}
 
-	/*
-	 * set the starting point
-	 *
-	 * XXX might be better done in indexam.c
-	 */
-	if (ScanDirectionIsForward(dir))
-		scan->xs_batch->currIndex = -1;
-	else
-		scan->xs_batch->currIndex = scan->xs_batch->nheaptids;
-
 	/* shouldn't be possible to end here with an empty batch */
 	Assert(scan->xs_batch->nheaptids > 0);
 }
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 4b96852c28c..05ff2285830 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -1145,13 +1145,6 @@ _spgist_copy_batch(IndexScanDesc scan, SpGistScanOpaque so,
 		start++;
 	}
 
-	/*
-	 * set the starting point
-	 *
-	 * XXX might be better done in indexam.c
-	 */
-	scan->xs_batch->currIndex = -1;
-
 	/* shouldn't be possible to end here with an empty batch */
 	Assert(scan->xs_batch->nheaptids > 0);
 }
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index bd8715b6797..ffc6cde79d6 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -216,7 +216,7 @@ table_index_fetch_tuple_check(Relation rel,
 	bool		found;
 
 	slot = table_slot_create(rel, NULL);
-	scan = table_index_fetch_begin(rel);
+	scan = table_index_fetch_begin(rel, NULL);
 	found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
 									all_dead);
 	table_index_fetch_end(scan);
diff --git a/src/backend/commands/constraint.c b/src/backend/commands/constraint.c
index f7dc42f7452..11b1f720c2c 100644
--- a/src/backend/commands/constraint.c
+++ b/src/backend/commands/constraint.c
@@ -106,7 +106,8 @@ unique_key_recheck(PG_FUNCTION_ARGS)
 	 */
 	tmptid = checktid;
 	{
-		IndexFetchTableData *scan = table_index_fetch_begin(trigdata->tg_relation);
+		IndexFetchTableData *scan = table_index_fetch_begin(trigdata->tg_relation,
+															NULL);
 		bool		call_again = false;
 
 		if (!table_index_fetch_tuple(scan, &tmptid, SnapshotSelf, slot,
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 8e55ffca197..65d16b981eb 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -145,9 +145,10 @@ IndexOnlyNext(IndexOnlyScanState *node)
 		}
 		else
 		{
+			int	lastIndex = scandesc->xs_batch->readPos.index;
+
 			/* Is the index of the current item valid for the batch? */
-			Assert((scandesc->xs_batch->currIndex >= 0) &&
-				   (scandesc->xs_batch->currIndex < scandesc->xs_batch->nheaptids));
+			Assert((lastIndex >= 0) && (lastIndex < scandesc->xs_batch->nheaptids));
 
 			/*
 			 * Reuse the previously determined page visibility info, or
@@ -159,8 +160,7 @@ IndexOnlyNext(IndexOnlyScanState *node)
 			 * visibility from that. Maybe we could/should have a more direct
 			 * way?
 			 */
-			all_visible = !ios_prefetch_block(scandesc, node,
-											  scandesc->xs_batch->currIndex);
+			all_visible = !ios_prefetch_block(scandesc, node, lastIndex);
 		}
 
 		/*
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 1d9a0868a9b..67e70a081b9 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -191,6 +191,7 @@ extern int64 index_getbitmap(IndexScanDesc scan, TIDBitmap *bitmap);
 /* index batching/prefetching */
 extern bool index_batch_add(IndexScanDesc scan, ItemPointerData tid, bool recheck,
 							IndexTuple itup, HeapTuple htup);
+extern void index_batch_reset_positions(IndexScanDesc scan, int index, bool restored);
 
 extern IndexBulkDeleteResult *index_bulk_delete(IndexVacuumInfo *info,
 												IndexBulkDeleteResult *istat,
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 33d2b6a6223..dc5470833c9 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -16,8 +16,10 @@
 
 #include "access/htup_details.h"
 #include "access/itup.h"
+#include "access/sdir.h"
 #include "port/atomics.h"
 #include "storage/buf.h"
+#include "storage/read_stream.h"
 #include "storage/relfilelocator.h"
 #include "storage/spin.h"
 #include "utils/relcache.h"
@@ -127,6 +129,7 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
 typedef struct IndexFetchTableData
 {
 	Relation	rel;
+	ReadStream *rs;
 } IndexFetchTableData;
 
 /* Forward declaration, the prefetch callback needs IndexScanDescData. */
@@ -202,6 +205,12 @@ typedef struct IndexScanDescData
 typedef bool (*IndexPrefetchCallback) (IndexScanDescData *scan,
 									   void *arg, int index);
 
+typedef struct IndexScanBatchPos {
+	int		index;			/* index into the batch items */
+	bool	restored;		/* Was this restored by restrpos? If yes, don't
+							 * advance on the first access. */
+} IndexScanBatchPos;
+
 /*
  * Data about the current TID batch returned by the index AM.
  *
@@ -227,22 +236,19 @@ typedef struct IndexScanBatchData
 	/* memory context for per-batch data */
 	MemoryContext ctx;
 
-	/*
-	 * Was this batch just restored by restrpos? if yes, we don't advance on
-	 * the first iteration.
-	 */
-	bool		restored;
-
-	/* batch prefetching */
-	int			prefetchTarget; /* current prefetch distance */
-	int			prefetchMaximum;	/* maximum prefetch distance */
-	int			prefetchIndex;	/* next item to prefetch */
+	/* most recent direction of the scan */
+	ScanDirection dir;
 
 	IndexPrefetchCallback	prefetchCallback;
 	void				   *prefetchArgument;
 
+	/*
+	 * Was this batch just restored by restrpos? If yes, we don't advance on
+	 * the first iteration.
+	 */
+	bool		resetIndexes;
+
 	/* batch contents (TIDs, index tuples, kill bitmap, ...) */
-	int			currIndex;		/* index of the current item */
 	int			nheaptids;		/* number of TIDs in the batch */
 	ItemPointerData *heaptids;	/* TIDs in the batch */
 	IndexTuple *itups;			/* IndexTuples, if requested */
@@ -250,6 +256,10 @@ typedef struct IndexScanBatchData
 	bool	   *recheck;		/* recheck flags */
 	Datum	   *privateData;	/* private data for batch */
 
+	/* current position in the batch */
+	IndexScanBatchPos	readPos;	/* used by executor */
+	IndexScanBatchPos	streamPos;	/* used by read stream */
+
 	/* xs_orderbyvals / xs_orderbynulls */
 	Datum	   *orderbyvals;
 	bool	   *orderbynulls;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index adb478a93ca..2a8960b4005 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -420,7 +420,8 @@ typedef struct TableAmRoutine
 	 *
 	 * Tuples for an index scan can then be fetched via index_fetch_tuple.
 	 */
-	struct IndexFetchTableData *(*index_fetch_begin) (Relation rel);
+	struct IndexFetchTableData *(*index_fetch_begin) (Relation rel,
+													  ReadStream *rs);
 
 	/*
 	 * Reset index fetch. Typically this will release cross index fetch
@@ -1198,9 +1199,9 @@ table_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
  * Tuples for an index scan can then be fetched via table_index_fetch_tuple().
  */
 static inline IndexFetchTableData *
-table_index_fetch_begin(Relation rel)
+table_index_fetch_begin(Relation rel, ReadStream *rs)
 {
-	return rel->rd_tableam->index_fetch_begin(rel);
+	return rel->rd_tableam->index_fetch_begin(rel, rs);
 }
 
 /*
-- 
2.47.0

