From 642f442946d793a0d95910e34b2a0f665fec9c37 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Mon, 30 Sep 2024 22:48:39 +0200
Subject: [PATCH v20250422 2/5] WIP: batching for nbtree indexes

Adds batching/prefetching for btree indexes. Returns only batches from a
single leaf page. Does not support mark/restore yet.
---
 src/backend/access/nbtree/nbtree.c    |  319 ++++
 src/backend/access/nbtree/nbtsearch.c | 1998 +++++++++++++++++++++++--
 src/backend/access/nbtree/nbtutils.c  |  179 +++
 src/include/access/nbtree.h           |   72 +-
 src/tools/pgindent/typedefs.list      |    2 +
 5 files changed, 2417 insertions(+), 153 deletions(-)

diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index accc7fe8bbe..3e3076b7e12 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -159,6 +159,8 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->ambeginscan = btbeginscan;
 	amroutine->amrescan = btrescan;
 	amroutine->amgettuple = btgettuple;
+	amroutine->amgetbatch = btgetbatch;
+	amroutine->amfreebatch = btfreebatch;
 	amroutine->amgetbitmap = btgetbitmap;
 	amroutine->amendscan = btendscan;
 	amroutine->ammarkpos = btmarkpos;
@@ -279,6 +281,158 @@ btgettuple(IndexScanDesc scan, ScanDirection dir)
 	return res;
 }
 
+/* FIXME duplicate from indexam.c */
+#define INDEX_SCAN_BATCH(scan, idx)	\
+		((scan)->xs_batches->batches[(idx) % (scan)->xs_batches->maxBatches])
+
+/*
+ *	btgetbatch() -- Get the next batch of tuples in the scan.
+ *
+ * XXX Simplified version of btgettuple(), but for batches of tuples.
+ */
+IndexScanBatch
+btgetbatch(IndexScanDesc scan, ScanDirection dir)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	IndexScanBatch res;
+	BTBatchScanPos pos = NULL;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/* btree indexes are never lossy */
+	scan->xs_recheck = false;
+
+	if (scan->xs_batches->firstBatch < scan->xs_batches->nextBatch)
+	{
+		IndexScanBatch batch = INDEX_SCAN_BATCH(scan, scan->xs_batches->nextBatch-1);
+		pos = (BTBatchScanPos) batch->opaque;
+	}
+
+	/* Each loop iteration performs another primitive index scan */
+	do
+	{
+		/*
+		 * If we've already initialized this scan, we can just advance it in
+		 * the appropriate direction.  If we haven't done so yet, we call
+		 * _bt_first() to get the first item in the scan.
+		 */
+		if (pos == NULL)
+			res = _bt_first_batch(scan, dir);
+		else
+		{
+			/*
+			 * Now continue the scan.
+			 */
+			res = _bt_next_batch(scan, pos, dir);
+		}
+
+		/* If we have a batch, return it ... */
+		if (res)
+			break;
+
+		/*
+		 * XXX we need to invoke _bt_first_batch on the next iteration, to
+		 * advance SAOP keys etc. But indexam.c already does this, but that's
+		 * only after this returns, so maybe this should do this in some other
+		 * way, not sure who should be responsible for setting currentBatch.
+		 *
+		 * XXX Maybe we don't even need that field? What is a current batch
+		 * anyway? There seem to be at least multiple concepts of "current"
+		 * batch, one for the read stream, another for executor ...
+		 */
+		// scan->xs_batches->currentBatch = res;
+
+		/*
+		 * We may do a new scan, depending on what _bt_start_prim_scan says.
+		 * In that case we need to start from scratch, not from the position
+		 * of the last batch. In regular non-batched scans we have currPos,
+		 * because we have just one leaf page for the whole scan, and we
+		 * invalidate it before loading the next one. But with batching that
+		 * doesn't work - we have many leafs, it's not clear which one is
+		 * 'current' (well, it's the last), and we can't invalidate it,
+		 * that's up to amfreebatch(). For now we deduce the position and
+		 * reset it to NULL, to indicate the same thing.
+		 *
+		 * XXX Maybe we should have something like 'currentBatch'? But then
+		 * that probably should be in BTScanOpaque, not in the generic
+		 * indexam.c part? Or it it a sufficiently generic thing? How would
+		 * we keep it in sync with the batch queue? If freeing batches is
+		 * up to indexam, how do we ensure the currentBatch does not point
+		 * to already removed batch?
+		 */
+		pos = NULL;
+
+		/* ... otherwise see if we need another primitive index scan */
+	} while (so->numArrayKeys && _bt_start_prim_scan(scan, dir));
+
+	return res;
+}
+
+/*
+ *	btgetbatch() -- Get the next batch of tuples in the scan.
+ *
+ * XXX Pretty much like btgettuple(), but for batches of tuples.
+ */
+void
+btfreebatch(IndexScanDesc scan, IndexScanBatch batch)
+{
+	BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque) scan->opaque;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/*
+	 * Check to see if we should kill tuples from the previous batch.
+	 */
+	_bt_kill_batch(scan, batch);
+
+	/* free all the stuff that might be allocated */
+
+	if (batch->items)
+		pfree(batch->items);
+
+	if (batch->itups)
+		pfree(batch->itups);
+
+	if (batch->htups)
+		pfree(batch->htups);
+
+	if (batch->recheck)
+		pfree(batch->recheck);
+
+	if (batch->privateData)
+		pfree(batch->privateData);
+
+	if (batch->orderbyvals)
+		pfree(batch->orderbyvals);
+
+	if (batch->orderbynulls)
+		pfree(batch->orderbynulls);
+
+	if (batch->currTuples)
+		pfree(batch->currTuples);
+
+	if (batch->opaque)
+	{
+		BTBatchScanPos pos = (BTBatchScanPos) batch->opaque;
+
+		BTBatchScanPosIsValid(*pos);
+		BTBatchScanPosIsPinned(*pos);
+
+		BTBatchScanPosUnpinIfPinned(*pos);
+
+		pfree(batch->opaque);
+	}
+
+	/* and finally free the batch itself */
+	pfree(batch);
+
+	return;
+}
+
 /*
  * btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
  */
@@ -376,6 +530,10 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
 
 /*
  *	btrescan() -- rescan an index relation
+ *
+ * Batches should have been freed from indexam using btfreebatch() before we
+ * get here, but then some of the generic scan stuff needs to be reset here.
+ * But we shouldn't need to do anything particular here, I think.
  */
 void
 btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
@@ -400,6 +558,10 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 	BTScanPosUnpinIfPinned(so->markPos);
 	BTScanPosInvalidate(so->markPos);
 
+	/* FIXME should be in indexam.c I think */
+	// if (scan->xs_batches)
+	//	scan->xs_batches->currentBatch = NULL;
+
 	/*
 	 * Allocate tuple workspace arrays, if needed for an index-only scan and
 	 * not already done in a previous rescan call.  To save on palloc
@@ -433,6 +595,10 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 
 /*
  *	btendscan() -- close down a scan
+ *
+ * Batches should have been freed from indexam using btfreebatch() before we
+ * get here, but then some of the generic scan stuff needs to be reset here.
+ * But we shouldn't need to do anything particular here, I think.
  */
 void
 btendscan(IndexScanDesc scan)
@@ -469,12 +635,18 @@ btendscan(IndexScanDesc scan)
 
 /*
  *	btmarkpos() -- save current scan position
+ *
+ * With batching, all the interesting markpos() stuff happens in indexam.c. We
+ * should not even get here.
  */
 void
 btmarkpos(IndexScanDesc scan)
 {
 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
+	/* with batching, mark/restore is handled in indexam */
+	Assert(scan->xs_batches == NULL);
+
 	/* There may be an old mark with a pin (but no lock). */
 	BTScanPosUnpinIfPinned(so->markPos);
 
@@ -495,12 +667,18 @@ btmarkpos(IndexScanDesc scan)
 
 /*
  *	btrestrpos() -- restore scan to last saved position
+ *
+ * With batching, all the interesting restrpos() stuff happens in indexam.c. We
+ * should not even get here.
  */
 void
 btrestrpos(IndexScanDesc scan)
 {
 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
 
+	/* with batching, mark/restore is handled in indexam */
+	Assert(scan->xs_batches == NULL);
+
 	if (so->markItemIndex >= 0)
 	{
 		/*
@@ -900,6 +1078,147 @@ _bt_parallel_seize(IndexScanDesc scan, BlockNumber *next_scan_page,
 	return status;
 }
 
+/*
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
+ *		page.  Other scans must wait until we call _bt_parallel_release()
+ *		or _bt_parallel_done().
+ *
+ * The return value is true if we successfully seized the scan and false
+ * if we did not.  The latter case occurs when no pages remain, or when
+ * another primitive index scan is scheduled that caller's backend cannot
+ * start just yet (only backends that call from _bt_first are capable of
+ * starting primitive index scans, which they indicate by passing first=true).
+ *
+ * If the return value is true, *next_scan_page returns the next page of the
+ * scan, and *last_curr_page returns the page that *next_scan_page came from.
+ * An invalid *next_scan_page means the scan hasn't yet started, or that
+ * caller needs to start the next primitive index scan (if it's the latter
+ * case we'll set so.needPrimScan).
+ *
+ * Callers should ignore the value of *next_scan_page and *last_curr_page if
+ * the return value is false.
+ */
+bool
+_bt_parallel_seize_batch(IndexScanDesc scan, BTBatchScanPos pos,
+						 BlockNumber *next_scan_page,
+						 BlockNumber *last_curr_page, bool first)
+{
+	Relation	rel = scan->indexRelation;
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	bool		exit_loop = false,
+				status = true,
+				endscan = false;
+	ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+	BTParallelScanDesc btscan;
+
+	*next_scan_page = InvalidBlockNumber;
+	*last_curr_page = InvalidBlockNumber;
+
+	/*
+	 * Reset so->currPos, and initialize moreLeft/moreRight such that the next
+	 * call to _bt_readnextpage treats this backend similarly to a serial
+	 * backend that steps from *last_curr_page to *next_scan_page (unless this
+	 * backend's so->currPos is initialized by _bt_readfirstpage before then).
+	 */
+	BTScanPosInvalidate(so->currPos);
+	pos->moreLeft = pos->moreRight = true;
+
+	if (first)
+	{
+		/*
+		 * Initialize array related state when called from _bt_first, assuming
+		 * that this will be the first primitive index scan for the scan
+		 */
+		so->needPrimScan = false;
+		so->scanBehind = false;
+		so->oppositeDirCheck = false;
+	}
+	else
+	{
+		/*
+		 * Don't attempt to seize the scan when it requires another primitive
+		 * index scan, since caller's backend cannot start it right now
+		 */
+		if (so->needPrimScan)
+			return false;
+	}
+
+	btscan = (BTParallelScanDesc) OffsetToPointer(parallel_scan,
+												  parallel_scan->ps_offset_am);
+
+	while (1)
+	{
+		LWLockAcquire(&btscan->btps_lock, LW_EXCLUSIVE);
+
+		if (btscan->btps_pageStatus == BTPARALLEL_DONE)
+		{
+			/* We're done with this parallel index scan */
+			status = false;
+		}
+		else if (btscan->btps_pageStatus == BTPARALLEL_IDLE &&
+				 btscan->btps_nextScanPage == P_NONE)
+		{
+			/* End this parallel index scan */
+			status = false;
+			endscan = true;
+		}
+		else if (btscan->btps_pageStatus == BTPARALLEL_NEED_PRIMSCAN)
+		{
+			Assert(so->numArrayKeys);
+
+			if (first)
+			{
+				/* Can start scheduled primitive scan right away, so do so */
+				btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+
+				/* Restore scan's array keys from serialized values */
+				_bt_parallel_restore_arrays(rel, btscan, so);
+				exit_loop = true;
+			}
+			else
+			{
+				/*
+				 * Don't attempt to seize the scan when it requires another
+				 * primitive index scan, since caller's backend cannot start
+				 * it right now
+				 */
+				status = false;
+			}
+
+			/*
+			 * Either way, update backend local state to indicate that a
+			 * pending primitive scan is required
+			 */
+			so->needPrimScan = true;
+			so->scanBehind = false;
+			so->oppositeDirCheck = false;
+		}
+		else if (btscan->btps_pageStatus != BTPARALLEL_ADVANCING)
+		{
+			/*
+			 * We have successfully seized control of the scan for the purpose
+			 * of advancing it to a new page!
+			 */
+			btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+			Assert(btscan->btps_nextScanPage != P_NONE);
+			*next_scan_page = btscan->btps_nextScanPage;
+			*last_curr_page = btscan->btps_lastCurrPage;
+			exit_loop = true;
+		}
+		LWLockRelease(&btscan->btps_lock);
+		if (exit_loop || !status)
+			break;
+		ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
+	}
+	ConditionVariableCancelSleep();
+
+	/* When the scan has reached the rightmost (or leftmost) page, end it */
+	if (endscan)
+		_bt_parallel_done(scan);
+
+	return status;
+}
+
 /*
  * _bt_parallel_release() -- Complete the process of advancing the scan to a
  *		new page.  We now have the new value btps_nextScanPage; another backend
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 77264ddeecb..10b28a76c0f 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -24,8 +24,20 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 
+/*
+ * XXX A lot of the new functions are copies of the non-batching version, with
+ * changes to make it work with batching (which means with position provided
+ * by the caller, not from the BTScanOpaque). The duplication is not great,
+ * but it's a bit unclear what to do about it. One option would be to remove
+ * the amgettuple() interface altogether, once the batching API works, but we
+ * may also choose to keep both (e.g. for cases that don't support batching,
+ * like scans of catalogs). In that case we'd need to do some refactoring to
+ * share as much code as possible.
+ */
 
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+
+/* static void _bt_drop_lock_and_maybe_pin_batch(IndexScanDesc scan, BTBatchScanPos sp); */
 static Buffer _bt_moveright(Relation rel, Relation heaprel, BTScanInsert key,
 							Buffer buf, bool forupdate, BTStack stack,
 							int access);
@@ -34,24 +46,44 @@ static int	_bt_binsrch_posting(BTScanInsert key, Page page,
 								OffsetNumber offnum);
 static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
 						 OffsetNumber offnum, bool firstpage);
+static IndexScanBatch _bt_readpage_batch(IndexScanDesc scan, BTBatchScanPos pos,
+										 ScanDirection dir, OffsetNumber offnum,
+										 bool firstPage);
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
 						 OffsetNumber offnum, IndexTuple itup);
+static void _bt_saveitem_batch(IndexScanBatch batch, int itemIndex,
+							   OffsetNumber offnum, IndexTuple itup);
 static int	_bt_setuppostingitems(BTScanOpaque so, int itemIndex,
 								  OffsetNumber offnum, ItemPointer heapTid,
 								  IndexTuple itup);
+static int	_bt_setuppostingitems_batch(IndexScanBatch batch, int itemIndex,
+										OffsetNumber offnum, ItemPointer heapTid,
+										IndexTuple itup);
 static inline void _bt_savepostingitem(BTScanOpaque so, int itemIndex,
 									   OffsetNumber offnum,
 									   ItemPointer heapTid, int tupleOffset);
+static inline void _bt_savepostingitem_batch(IndexScanBatch batch, int itemIndex,
+											 OffsetNumber offnum,
+											 ItemPointer heapTid, int tupleOffset);
 static inline void _bt_returnitem(IndexScanDesc scan, BTScanOpaque so);
 static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static IndexScanBatch _bt_steppage_batch(IndexScanDesc scan, BTBatchScanPos pos,
+										 ScanDirection dir);
 static bool _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum,
 							  ScanDirection dir);
+static IndexScanBatch _bt_readfirstpage_batch(IndexScanDesc scan, BTBatchScanPos pos,
+											  OffsetNumber offnum,
+											  ScanDirection dir);
 static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno,
 							 BlockNumber lastcurrblkno, ScanDirection dir,
 							 bool seized);
+static IndexScanBatch _bt_readnextpage_batch(IndexScanDesc scan, BTBatchScanPos pos,
+											 BlockNumber blkno, BlockNumber lastcurrblkno,
+											 ScanDirection dir, bool seized);
 static Buffer _bt_lock_and_validate_left(Relation rel, BlockNumber *blkno,
 										 BlockNumber lastcurrblkno);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
+static IndexScanBatch _bt_endpoint_batch(IndexScanDesc scan, ScanDirection dir);
 
 
 /*
@@ -77,6 +109,20 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
 	}
 }
 
+/* static void */
+/* _bt_drop_lock_and_maybe_pin_batch(IndexScanDesc scan, BTBatchScanPos sp) */
+/* { */
+/* 	_bt_unlockbuf(scan->indexRelation, sp->buf); */
+/*  */
+/* /	if (IsMVCCSnapshot(scan->xs_snapshot) && */
+/* 		RelationNeedsWAL(scan->indexRelation) && */
+/* 		!scan->xs_want_itup) */
+/* 	{ */
+/* 		ReleaseBuffer(sp->buf); */
+/* 		sp->buf = InvalidBuffer; */
+/* 	} */
+/* } */
+
 /*
  *	_bt_search() -- Search the tree for a particular scankey,
  *		or more precisely for the first leaf page it could be on.
@@ -1570,136 +1616,1344 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
 }
 
 /*
- *	_bt_readpage() -- Load data from current index page into so->currPos
+ *	_bt_first_batch() -- Load the first batch in a scan.
  *
- * Caller must have pinned and read-locked so->currPos.buf; the buffer's state
- * is not changed here.  Also, currPos.moreLeft and moreRight must be valid;
- * they are updated as appropriate.  All other fields of so->currPos are
- * initialized from scratch here.
+ * A batch variant of _bt_first(). Most of the comments for that function
+ * apply here too.
  *
- * We scan the current page starting at offnum and moving in the indicated
- * direction.  All items matching the scan keys are loaded into currPos.items.
- * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
- * that there can be no more matching tuples in the current scan direction
- * (could just be for the current primitive index scan when scan has arrays).
+ * XXX This only populates the batch, it does not set any other fields like
+ * scan->xs_heaptid or scan->xs_itup. That happens in getnext_tid() calls.
  *
- * In the case of a parallel scan, caller must have called _bt_parallel_seize
- * prior to calling this function; this function will invoke
- * _bt_parallel_release before returning.
+ * XXX I'm not sure it works to mix batched and non-batches calls, e.g. get
+ * a TID and then a batch of TIDs. It probably should work as long as we
+ * update itemIndex correctly, but we need to be careful about killed items
+ * (right now the two places use different ways to communicate which items
+ * should be killed).
  *
- * Returns true if any matching items found on the page, false if none.
+ * XXX We probably should not rely on _bt_first/_bt_steppage, because that
+ * very much relies on currPos, and it's just laziness to rely on that. For
+ * batching we probably need something else anyway.
  */
-static bool
-_bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
-			 bool firstpage)
+IndexScanBatch
+_bt_first_batch(IndexScanDesc scan, ScanDirection dir)
 {
 	Relation	rel = scan->indexRelation;
 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
-	Page		page;
-	BTPageOpaque opaque;
-	OffsetNumber minoff;
-	OffsetNumber maxoff;
-	BTReadPageState pstate;
-	bool		arrayKeys;
-	int			itemIndex,
-				indnatts;
+	BTStack		stack;
+	OffsetNumber offnum;
+	BTScanInsertData inskey;
+	ScanKey		startKeys[INDEX_MAX_KEYS];
+	ScanKeyData notnullkeys[INDEX_MAX_KEYS];
+	int			keysz = 0;
+	StrategyNumber strat_total;
+	BlockNumber blkno = InvalidBlockNumber,
+				lastcurrblkno;
+	BTBatchScanPosData pos;
 
-	/* save the page/buffer block number, along with its sibling links */
-	page = BufferGetPage(so->currPos.buf);
-	opaque = BTPageGetOpaque(page);
-	so->currPos.currPage = BufferGetBlockNumber(so->currPos.buf);
-	so->currPos.prevPage = opaque->btpo_prev;
-	so->currPos.nextPage = opaque->btpo_next;
+	BTBatchScanPosInvalidate(pos);
 
-	Assert(!P_IGNORE(opaque));
-	Assert(BTScanPosIsPinned(so->currPos));
-	Assert(!so->needPrimScan);
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
 
-	if (scan->parallel_scan)
-	{
-		/* allow next/prev page to be read by other worker without delay */
-		if (ScanDirectionIsForward(dir))
-			_bt_parallel_release(scan, so->currPos.nextPage,
-								 so->currPos.currPage);
-		else
-			_bt_parallel_release(scan, so->currPos.prevPage,
-								 so->currPos.currPage);
-	}
+	/* FIXME maybe check there's no active batch yet */
+	/* Assert(!BTScanPosIsValid(so->currPos)); */
 
-	/* initialize remaining currPos fields related to current page */
-	so->currPos.lsn = BufferGetLSNAtomic(so->currPos.buf);
-	so->currPos.dir = dir;
-	so->currPos.nextTupleOffset = 0;
-	/* either moreLeft or moreRight should be set now (may be unset later) */
-	Assert(ScanDirectionIsForward(dir) ? so->currPos.moreRight :
-		   so->currPos.moreLeft);
+	/*
+	 * Examine the scan keys and eliminate any redundant keys; also mark the
+	 * keys that must be matched to continue the scan.
+	 */
+	_bt_preprocess_keys(scan);
 
-	PredicateLockPage(rel, so->currPos.currPage, scan->xs_snapshot);
+	/*
+	 * Quit now if _bt_preprocess_keys() discovered that the scan keys can
+	 * never be satisfied (eg, x == 1 AND x > 2).
+	 */
+	if (!so->qual_ok)
+	{
+		Assert(!so->needPrimScan);
+		_bt_parallel_done(scan);
+		return false;
+	}
 
-	/* initialize local variables */
-	indnatts = IndexRelationGetNumberOfAttributes(rel);
-	arrayKeys = so->numArrayKeys != 0;
-	minoff = P_FIRSTDATAKEY(opaque);
-	maxoff = PageGetMaxOffsetNumber(page);
+	/*
+	 * If this is a parallel scan, we must seize the scan.  _bt_readfirstpage
+	 * will likely release the parallel scan later on.
+	 */
+	if (scan->parallel_scan != NULL &&
+		!_bt_parallel_seize_batch(scan, &pos, &blkno, &lastcurrblkno, true))
+		return false;
 
-	/* initialize page-level state that we'll pass to _bt_checkkeys */
-	pstate.minoff = minoff;
-	pstate.maxoff = maxoff;
-	pstate.finaltup = NULL;
-	pstate.page = page;
-	pstate.firstpage = firstpage;
-	pstate.forcenonrequired = false;
-	pstate.startikey = 0;
-	pstate.offnum = InvalidOffsetNumber;
-	pstate.skip = InvalidOffsetNumber;
-	pstate.continuescan = true; /* default assumption */
-	pstate.rechecks = 0;
-	pstate.targetdistance = 0;
-	pstate.nskipadvances = 0;
+	/*
+	 * Initialize the scan's arrays (if any) for the current scan direction
+	 * (except when they were already set to later values as part of
+	 * scheduling the primitive index scan that is now underway)
+	 */
+	if (so->numArrayKeys && !so->needPrimScan)
+		_bt_start_array_keys(scan, dir);
 
-	if (ScanDirectionIsForward(dir))
+	if (blkno != InvalidBlockNumber)
 	{
-		/* SK_SEARCHARRAY forward scans must provide high key up front */
-		if (arrayKeys)
-		{
-			if (!P_RIGHTMOST(opaque))
-			{
-				ItemId		iid = PageGetItemId(page, P_HIKEY);
+		/*
+		 * We anticipated calling _bt_search, but another worker bet us to it.
+		 * _bt_readnextpage releases the scan for us (not _bt_readfirstpage).
+		 */
+		Assert(scan->parallel_scan != NULL);
+		Assert(!so->needPrimScan);
+		Assert(blkno != P_NONE);
 
-				pstate.finaltup = (IndexTuple) PageGetItem(page, iid);
+		return _bt_readnextpage_batch(scan, &pos, blkno, lastcurrblkno, dir, true);
+	}
 
-				if (so->scanBehind &&
-					!_bt_scanbehind_checkkeys(scan, dir, pstate.finaltup))
-				{
-					/* Schedule another primitive index scan after all */
-					so->currPos.moreRight = false;
-					so->needPrimScan = true;
-					if (scan->parallel_scan)
-						_bt_parallel_primscan_schedule(scan,
-													   so->currPos.currPage);
-					return false;
-				}
-			}
+	/*
+	 * Count an indexscan for stats, now that we know that we'll call
+	 * _bt_search/_bt_endpoint below
+	 */
+	pgstat_count_index_scan(rel);
+	if (scan->instrument)
+		scan->instrument->nsearches++;
 
-			so->scanBehind = so->oppositeDirCheck = false;	/* reset */
-		}
+	/*----------
+	 * Examine the scan keys to discover where we need to start the scan.
+	 *
+	 * We want to identify the keys that can be used as starting boundaries;
+	 * these are =, >, or >= keys for a forward scan or =, <, <= keys for
+	 * a backwards scan.  We can use keys for multiple attributes so long as
+	 * the prior attributes had only =, >= (resp. =, <=) keys.  Once we accept
+	 * a > or < boundary or find an attribute with no boundary (which can be
+	 * thought of as the same as "> -infinity"), we can't use keys for any
+	 * attributes to its right, because it would break our simplistic notion
+	 * of what initial positioning strategy to use.
+	 *
+	 * When the scan keys include cross-type operators, _bt_preprocess_keys
+	 * may not be able to eliminate redundant keys; in such cases we will
+	 * arbitrarily pick a usable one for each attribute.  This is correct
+	 * but possibly not optimal behavior.  (For example, with keys like
+	 * "x >= 4 AND x >= 5" we would elect to scan starting at x=4 when
+	 * x=5 would be more efficient.)  Since the situation only arises given
+	 * a poorly-worded query plus an incomplete opfamily, live with it.
+	 *
+	 * When both equality and inequality keys appear for a single attribute
+	 * (again, only possible when cross-type operators appear), we *must*
+	 * select one of the equality keys for the starting point, because
+	 * _bt_checkkeys() will stop the scan as soon as an equality qual fails.
+	 * For example, if we have keys like "x >= 4 AND x = 10" and we elect to
+	 * start at x=4, we will fail and stop before reaching x=10.  If multiple
+	 * equality quals survive preprocessing, however, it doesn't matter which
+	 * one we use --- by definition, they are either redundant or
+	 * contradictory.
+	 *
+	 * In practice we rarely see any "attribute boundary key gaps" here.
+	 * Preprocessing can usually backfill skip array keys for any attributes
+	 * that were omitted from the original scan->keyData[] input keys.  All
+	 * array keys are always considered = keys, but we'll sometimes need to
+	 * treat the current key value as if we were using an inequality strategy.
+	 * This happens with range skip arrays, which store inequality keys in the
+	 * array's low_compare/high_compare fields (used to find the first/last
+	 * set of matches, when = key will lack a usable sk_argument value).
+	 * These are always preferred over any redundant "standard" inequality
+	 * keys on the same column (per the usual rule about preferring = keys).
+	 * Note also that any column with an = skip array key can never have an
+	 * additional, contradictory = key.
+	 *
+	 * All keys (with the exception of SK_SEARCHNULL keys and SK_BT_SKIP
+	 * array keys whose array is "null_elem=true") imply a NOT NULL qualifier.
+	 * If the index stores nulls at the end of the index we'll be starting
+	 * from, and we have no boundary key for the column (which means the key
+	 * we deduced NOT NULL from is an inequality key that constrains the other
+	 * end of the index), then we cons up an explicit SK_SEARCHNOTNULL key to
+	 * use as a boundary key.  If we didn't do this, we might find ourselves
+	 * traversing a lot of null entries at the start of the scan.
+	 *
+	 * In this loop, row-comparison keys are treated the same as keys on their
+	 * first (leftmost) columns.  We'll add on lower-order columns of the row
+	 * comparison below, if possible.
+	 *
+	 * The selected scan keys (at most one per index column) are remembered by
+	 * storing their addresses into the local startKeys[] array.
+	 *
+	 * _bt_checkkeys/_bt_advance_array_keys decide whether and when to start
+	 * the next primitive index scan (for scans with array keys) based in part
+	 * on an understanding of how it'll enable us to reposition the scan.
+	 * They're directly aware of how we'll sometimes cons up an explicit
+	 * SK_SEARCHNOTNULL key.  They'll even end primitive scans by applying a
+	 * symmetric "deduce NOT NULL" rule of their own.  This allows top-level
+	 * scans to skip large groups of NULLs through repeated deductions about
+	 * key strictness (for a required inequality key) and whether NULLs in the
+	 * key's index column are stored last or first (relative to non-NULLs).
+	 * If you update anything here, _bt_checkkeys/_bt_advance_array_keys might
+	 * need to be kept in sync.
+	 *----------
+	 */
+	strat_total = BTEqualStrategyNumber;
+	if (so->numberOfKeys > 0)
+	{
+		AttrNumber	curattr;
+		ScanKey		chosen;
+		ScanKey		impliesNN;
+		ScanKey		cur;
 
 		/*
-		 * Consider pstate.startikey optimization once the ongoing primitive
-		 * index scan has already read at least one page
+		 * chosen is the so-far-chosen key for the current attribute, if any.
+		 * We don't cast the decision in stone until we reach keys for the
+		 * next attribute.
 		 */
-		if (!pstate.firstpage && minoff < maxoff)
-			_bt_set_startikey(scan, &pstate);
-
-		/* load items[] in ascending order */
-		itemIndex = 0;
-
-		offnum = Max(offnum, minoff);
+		cur = so->keyData;
+		curattr = 1;
+		chosen = NULL;
+		/* Also remember any scankey that implies a NOT NULL constraint */
+		impliesNN = NULL;
 
-		while (offnum <= maxoff)
+		/*
+		 * Loop iterates from 0 to numberOfKeys inclusive; we use the last
+		 * pass to handle after-last-key processing.  Actual exit from the
+		 * loop is at one of the "break" statements below.
+		 */
+		for (int i = 0;; cur++, i++)
 		{
-			ItemId		iid = PageGetItemId(page, offnum);
-			IndexTuple	itup;
+			if (i >= so->numberOfKeys || cur->sk_attno != curattr)
+			{
+				/*
+				 * Done looking at keys for curattr.
+				 *
+				 * If this is a scan key for a skip array whose current
+				 * element is MINVAL, choose low_compare (when scanning
+				 * backwards it'll be MAXVAL, and we'll choose high_compare).
+				 *
+				 * Note: if the array's low_compare key makes 'chosen' NULL,
+				 * then we behave as if the array's first element is -inf,
+				 * except when !array->null_elem implies a usable NOT NULL
+				 * constraint.
+				 */
+				if (chosen != NULL &&
+					(chosen->sk_flags & (SK_BT_MINVAL | SK_BT_MAXVAL)))
+				{
+					int			ikey = chosen - so->keyData;
+					ScanKey		skipequalitykey = chosen;
+					BTArrayKeyInfo *array = NULL;
+
+					for (int arridx = 0; arridx < so->numArrayKeys; arridx++)
+					{
+						array = &so->arrayKeys[arridx];
+						if (array->scan_key == ikey)
+							break;
+					}
+
+					if (ScanDirectionIsForward(dir))
+					{
+						Assert(!(skipequalitykey->sk_flags & SK_BT_MAXVAL));
+						chosen = array->low_compare;
+					}
+					else
+					{
+						Assert(!(skipequalitykey->sk_flags & SK_BT_MINVAL));
+						chosen = array->high_compare;
+					}
+
+					Assert(chosen == NULL ||
+						   chosen->sk_attno == skipequalitykey->sk_attno);
+
+					if (!array->null_elem)
+						impliesNN = skipequalitykey;
+					else
+						Assert(chosen == NULL && impliesNN == NULL);
+				}
+
+				/*
+				 * If we didn't find a usable boundary key, see if we can
+				 * deduce a NOT NULL key
+				 */
+				if (chosen == NULL && impliesNN != NULL &&
+					((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+					 ScanDirectionIsForward(dir) :
+					 ScanDirectionIsBackward(dir)))
+				{
+					/* Yes, so build the key in notnullkeys[keysz] */
+					chosen = &notnullkeys[keysz];
+					ScanKeyEntryInitialize(chosen,
+										   (SK_SEARCHNOTNULL | SK_ISNULL |
+											(impliesNN->sk_flags &
+											 (SK_BT_DESC | SK_BT_NULLS_FIRST))),
+										   curattr,
+										   ((impliesNN->sk_flags & SK_BT_NULLS_FIRST) ?
+											BTGreaterStrategyNumber :
+											BTLessStrategyNumber),
+										   InvalidOid,
+										   InvalidOid,
+										   InvalidOid,
+										   (Datum) 0);
+				}
+
+				/*
+				 * If we still didn't find a usable boundary key, quit; else
+				 * save the boundary key pointer in startKeys.
+				 */
+				if (chosen == NULL)
+					break;
+				startKeys[keysz++] = chosen;
+
+				/*
+				 * We can only consider adding more boundary keys when the one
+				 * that we just chose to add uses either the = or >= strategy
+				 * (during backwards scans we can only do so when the key that
+				 * we just added to startKeys[] uses the = or <= strategy)
+				 */
+				strat_total = chosen->sk_strategy;
+				if (strat_total == BTGreaterStrategyNumber ||
+					strat_total == BTLessStrategyNumber)
+					break;
+
+				/*
+				 * If the key that we just added to startKeys[] is a skip
+				 * array = key whose current element is marked NEXT or PRIOR,
+				 * make strat_total > or < (and stop adding boundary keys).
+				 * This can only happen with opclasses that lack skip support.
+				 */
+				if (chosen->sk_flags & (SK_BT_NEXT | SK_BT_PRIOR))
+				{
+					Assert(chosen->sk_flags & SK_BT_SKIP);
+					Assert(strat_total == BTEqualStrategyNumber);
+
+					if (ScanDirectionIsForward(dir))
+					{
+						Assert(!(chosen->sk_flags & SK_BT_PRIOR));
+						strat_total = BTGreaterStrategyNumber;
+					}
+					else
+					{
+						Assert(!(chosen->sk_flags & SK_BT_NEXT));
+						strat_total = BTLessStrategyNumber;
+					}
+
+					/*
+					 * We're done.  We'll never find an exact = match for a
+					 * NEXT or PRIOR sentinel sk_argument value.  There's no
+					 * sense in trying to add more keys to startKeys[].
+					 */
+					break;
+				}
+
+				/*
+				 * Done if that was the last scan key output by preprocessing.
+				 * Also done if there is a gap index attribute that lacks a
+				 * usable key (only possible when preprocessing was unable to
+				 * generate a skip array key to "fill in the gap").
+				 */
+				if (i >= so->numberOfKeys ||
+					cur->sk_attno != curattr + 1)
+					break;
+
+				/*
+				 * Reset for next attr.
+				 */
+				curattr = cur->sk_attno;
+				chosen = NULL;
+				impliesNN = NULL;
+			}
+
+			/*
+			 * Can we use this key as a starting boundary for this attr?
+			 *
+			 * If not, does it imply a NOT NULL constraint?  (Because
+			 * SK_SEARCHNULL keys are always assigned BTEqualStrategyNumber,
+			 * *any* inequality key works for that; we need not test.)
+			 */
+			switch (cur->sk_strategy)
+			{
+				case BTLessStrategyNumber:
+				case BTLessEqualStrategyNumber:
+					if (chosen == NULL)
+					{
+						if (ScanDirectionIsBackward(dir))
+							chosen = cur;
+						else
+							impliesNN = cur;
+					}
+					break;
+				case BTEqualStrategyNumber:
+					/* override any non-equality choice */
+					chosen = cur;
+					break;
+				case BTGreaterEqualStrategyNumber:
+				case BTGreaterStrategyNumber:
+					if (chosen == NULL)
+					{
+						if (ScanDirectionIsForward(dir))
+							chosen = cur;
+						else
+							impliesNN = cur;
+					}
+					break;
+			}
+		}
+	}
+
+	/*
+	 * If we found no usable boundary keys, we have to start from one end of
+	 * the tree.  Walk down that edge to the first or last key, and scan from
+	 * there.
+	 *
+	 * Note: calls _bt_readfirstpage for us, which releases the parallel scan.
+	 */
+	if (keysz == 0)
+		return _bt_endpoint_batch(scan, dir);
+
+	/*
+	 * We want to start the scan somewhere within the index.  Set up an
+	 * insertion scankey we can use to search for the boundary point we
+	 * identified above.  The insertion scankey is built using the keys
+	 * identified by startKeys[].  (Remaining insertion scankey fields are
+	 * initialized after initial-positioning scan keys are finalized.)
+	 */
+	Assert(keysz <= INDEX_MAX_KEYS);
+	for (int i = 0; i < keysz; i++)
+	{
+		ScanKey		cur = startKeys[i];
+
+		Assert(cur->sk_attno == i + 1);
+
+		if (cur->sk_flags & SK_ROW_HEADER)
+		{
+			/*
+			 * Row comparison header: look to the first row member instead
+			 */
+			ScanKey		subkey = (ScanKey) DatumGetPointer(cur->sk_argument);
+
+			/*
+			 * Cannot be a NULL in the first row member: _bt_preprocess_keys
+			 * would've marked the qual as unsatisfiable, preventing us from
+			 * ever getting this far
+			 */
+			Assert(subkey->sk_flags & SK_ROW_MEMBER);
+			Assert(subkey->sk_attno == cur->sk_attno);
+			Assert(!(subkey->sk_flags & SK_ISNULL));
+
+			/*
+			 * The member scankeys are already in insertion format (ie, they
+			 * have sk_func = 3-way-comparison function)
+			 */
+			memcpy(inskey.scankeys + i, subkey, sizeof(ScanKeyData));
+
+			/*
+			 * If the row comparison is the last positioning key we accepted,
+			 * try to add additional keys from the lower-order row members.
+			 * (If we accepted independent conditions on additional index
+			 * columns, we use those instead --- doesn't seem worth trying to
+			 * determine which is more restrictive.)  Note that this is OK
+			 * even if the row comparison is of ">" or "<" type, because the
+			 * condition applied to all but the last row member is effectively
+			 * ">=" or "<=", and so the extra keys don't break the positioning
+			 * scheme.  But, by the same token, if we aren't able to use all
+			 * the row members, then the part of the row comparison that we
+			 * did use has to be treated as just a ">=" or "<=" condition, and
+			 * so we'd better adjust strat_total accordingly.
+			 */
+			if (i == keysz - 1)
+			{
+				bool		used_all_subkeys = false;
+
+				Assert(!(subkey->sk_flags & SK_ROW_END));
+				for (;;)
+				{
+					subkey++;
+					Assert(subkey->sk_flags & SK_ROW_MEMBER);
+					if (subkey->sk_attno != keysz + 1)
+						break;	/* out-of-sequence, can't use it */
+					if (subkey->sk_strategy != cur->sk_strategy)
+						break;	/* wrong direction, can't use it */
+					if (subkey->sk_flags & SK_ISNULL)
+						break;	/* can't use null keys */
+					Assert(keysz < INDEX_MAX_KEYS);
+					memcpy(inskey.scankeys + keysz, subkey,
+						   sizeof(ScanKeyData));
+					keysz++;
+					if (subkey->sk_flags & SK_ROW_END)
+					{
+						used_all_subkeys = true;
+						break;
+					}
+				}
+				if (!used_all_subkeys)
+				{
+					switch (strat_total)
+					{
+						case BTLessStrategyNumber:
+							strat_total = BTLessEqualStrategyNumber;
+							break;
+						case BTGreaterStrategyNumber:
+							strat_total = BTGreaterEqualStrategyNumber;
+							break;
+					}
+				}
+				break;			/* done with outer loop */
+			}
+		}
+		else
+		{
+			/*
+			 * Ordinary comparison key.  Transform the search-style scan key
+			 * to an insertion scan key by replacing the sk_func with the
+			 * appropriate btree comparison function.
+			 *
+			 * If scankey operator is not a cross-type comparison, we can use
+			 * the cached comparison function; otherwise gotta look it up in
+			 * the catalogs.  (That can't lead to infinite recursion, since no
+			 * indexscan initiated by syscache lookup will use cross-data-type
+			 * operators.)
+			 *
+			 * We support the convention that sk_subtype == InvalidOid means
+			 * the opclass input type; this is a hack to simplify life for
+			 * ScanKeyInit().
+			 */
+			if (cur->sk_subtype == rel->rd_opcintype[i] ||
+				cur->sk_subtype == InvalidOid)
+			{
+				FmgrInfo   *procinfo;
+
+				procinfo = index_getprocinfo(rel, cur->sk_attno, BTORDER_PROC);
+				ScanKeyEntryInitializeWithInfo(inskey.scankeys + i,
+											   cur->sk_flags,
+											   cur->sk_attno,
+											   InvalidStrategy,
+											   cur->sk_subtype,
+											   cur->sk_collation,
+											   procinfo,
+											   cur->sk_argument);
+			}
+			else
+			{
+				RegProcedure cmp_proc;
+
+				cmp_proc = get_opfamily_proc(rel->rd_opfamily[i],
+											 rel->rd_opcintype[i],
+											 cur->sk_subtype,
+											 BTORDER_PROC);
+				if (!RegProcedureIsValid(cmp_proc))
+					elog(ERROR, "missing support function %d(%u,%u) for attribute %d of index \"%s\"",
+						 BTORDER_PROC, rel->rd_opcintype[i], cur->sk_subtype,
+						 cur->sk_attno, RelationGetRelationName(rel));
+				ScanKeyEntryInitialize(inskey.scankeys + i,
+									   cur->sk_flags,
+									   cur->sk_attno,
+									   InvalidStrategy,
+									   cur->sk_subtype,
+									   cur->sk_collation,
+									   cmp_proc,
+									   cur->sk_argument);
+			}
+		}
+	}
+
+	/*----------
+	 * Examine the selected initial-positioning strategy to determine exactly
+	 * where we need to start the scan, and set flag variables to control the
+	 * initial descent by _bt_search (and our _bt_binsrch call for the leaf
+	 * page _bt_search returns).
+	 *----------
+	 */
+	_bt_metaversion(rel, &inskey.heapkeyspace, &inskey.allequalimage);
+	inskey.anynullkeys = false; /* unused */
+	inskey.scantid = NULL;
+	inskey.keysz = keysz;
+	switch (strat_total)
+	{
+		case BTLessStrategyNumber:
+
+			inskey.nextkey = false;
+			inskey.backward = true;
+			break;
+
+		case BTLessEqualStrategyNumber:
+
+			inskey.nextkey = true;
+			inskey.backward = true;
+			break;
+
+		case BTEqualStrategyNumber:
+
+			/*
+			 * If a backward scan was specified, need to start with last equal
+			 * item not first one.
+			 */
+			if (ScanDirectionIsBackward(dir))
+			{
+				/*
+				 * This is the same as the <= strategy
+				 */
+				inskey.nextkey = true;
+				inskey.backward = true;
+			}
+			else
+			{
+				/*
+				 * This is the same as the >= strategy
+				 */
+				inskey.nextkey = false;
+				inskey.backward = false;
+			}
+			break;
+
+		case BTGreaterEqualStrategyNumber:
+
+			/*
+			 * Find first item >= scankey
+			 */
+			inskey.nextkey = false;
+			inskey.backward = false;
+			break;
+
+		case BTGreaterStrategyNumber:
+
+			/*
+			 * Find first item > scankey
+			 */
+			inskey.nextkey = true;
+			inskey.backward = false;
+			break;
+
+		default:
+			/* can't get here, but keep compiler quiet */
+			elog(ERROR, "unrecognized strat_total: %d", (int) strat_total);
+			return false;
+	}
+
+	/*
+	 * Use the manufactured insertion scan key to descend the tree and
+	 * position ourselves on the target leaf page.
+	 */
+	Assert(ScanDirectionIsBackward(dir) == inskey.backward);
+	stack = _bt_search(rel, NULL, &inskey, &pos.buf, BT_READ);
+
+	/* don't need to keep the stack around... */
+	_bt_freestack(stack);
+
+	if (!BufferIsValid(pos.buf))
+	{
+		/*
+		 * We only get here if the index is completely empty. Lock relation
+		 * because nothing finer to lock exists.  Without a buffer lock, it's
+		 * possible for another transaction to insert data between
+		 * _bt_search() and PredicateLockRelation().  We have to try again
+		 * after taking the relation-level predicate lock, to close a narrow
+		 * window where we wouldn't scan concurrently inserted tuples, but the
+		 * writer wouldn't see our predicate lock.
+		 */
+		if (IsolationIsSerializable())
+		{
+			PredicateLockRelation(rel, scan->xs_snapshot);
+			stack = _bt_search(rel, NULL, &inskey, &pos.buf, BT_READ);
+			_bt_freestack(stack);
+		}
+
+		if (!BufferIsValid(pos.buf))
+		{
+			Assert(!so->needPrimScan);
+			_bt_parallel_done(scan);
+			return false;
+		}
+	}
+
+	/* position to the precise item on the page */
+	offnum = _bt_binsrch(rel, &inskey, pos.buf);
+
+	/*
+	 * Now load data from the first page of the scan (usually the page
+	 * currently in so->currPos.buf).
+	 *
+	 * If inskey.nextkey = false and inskey.backward = false, offnum is
+	 * positioned at the first non-pivot tuple >= inskey.scankeys.
+	 *
+	 * If inskey.nextkey = false and inskey.backward = true, offnum is
+	 * positioned at the last non-pivot tuple < inskey.scankeys.
+	 *
+	 * If inskey.nextkey = true and inskey.backward = false, offnum is
+	 * positioned at the first non-pivot tuple > inskey.scankeys.
+	 *
+	 * If inskey.nextkey = true and inskey.backward = true, offnum is
+	 * positioned at the last non-pivot tuple <= inskey.scankeys.
+	 *
+	 * It's possible that _bt_binsrch returned an offnum that is out of bounds
+	 * for the page.  For example, when inskey is both < the leaf page's high
+	 * key and > all of its non-pivot tuples, offnum will be "maxoff + 1".
+	 */
+	return _bt_readfirstpage_batch(scan, &pos, offnum, dir);
+}
+
+/*
+ *	_bt_next_batch() -- Get the next batch of items in a scan.
+ *
+ * A batch variant of _bt_next(). Most of the comments for that function
+ * apply here too.
+ *
+ * We should only get here only when the current batch has no more items
+ * in the given direction. We don't get here with empty batches, that's
+ * handled by _bt_fist_batch().
+ *
+ * XXX See also the comments at _bt_first_batch() about returning a single
+ * batch for the page, etc.
+ */
+IndexScanBatch
+_bt_next_batch(IndexScanDesc scan, BTBatchScanPos pos, ScanDirection dir)
+{
+	BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque) scan->opaque;
+	// BTBatchScanPos pos;
+	BTBatchScanPosData tmp;
+	// IndexScanBatch	batch;
+	// int 			idx;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/*
+	 * restore the BTScanOpaque from the current batch
+	 *
+	 * XXX This is pretty ugly/expensive. Ideally we'd have all the fields
+	 * needed to determine "location" in the index (essentially BTScanPosData)
+	 * in the batch, without cloning all the other stuff.
+	 */
+	// Assert(scan->xs_batches->currentBatch != NULL);
+
+	/*
+	 * Use the last batch as the "current" batch. We use the streamPos if
+	 * initialized, or the readPos as a fallback. Alternatively, we could
+	 * simply use the last batch in the queue, i.e. (nextBatch - 1).
+	 *
+	 * Even better, we could pass the "correct" batch from indexam.c, and
+	 * let that figure out which position to move from.
+	 */
+/*
+	idx = scan->xs_batches->streamPos.batch;
+	if (idx == -1)
+		idx = scan->xs_batches->readPos.batch;
+
+	batch = INDEX_SCAN_BATCH(scan, idx);
+	Assert(batch != NULL);
+	pos = (BTBatchScanPos) batch->opaque;
+*/
+
+	Assert(BTBatchScanPosIsPinned(*pos));
+
+	memcpy(&tmp, pos, sizeof(tmp));
+
+	/*
+	 * Advance to next page, load the data into the index batch.
+	 *
+	 * FIXME It may not be quite correct to just pass the position from
+	 * current batch, some of the functions scribble over it (e.g.
+	 * _bt_readpage_batch). Maybe we should create a copy, or something?
+	 *
+	 * XXX For now we pass a local copy "tmp".
+	 */
+	return _bt_steppage_batch(scan, &tmp, dir);
+}
+
+/*
+ *	_bt_kill_batch() -- remember the items-to-be-killed from the current batch
+ *
+ * We simply translate the bitmap into the "regular" killedItems array, and let
+ * that to drive which items are killed.
+ */
+void
+_bt_kill_batch(IndexScanDesc scan, IndexScanBatch batch)
+{
+	BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque) scan->opaque;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/* we should only get here for scans with batching */
+	Assert(scan->xs_batches);
+
+	/* bail out if the batch has no killed items */
+	if (batch->numKilled == 0)
+		return;
+
+	/*
+	 * XXX Now what? we don't have the currPos around anymore, so we should
+	 * load that, and apply the killed items to that, somehow?
+	 */
+	/* FIXME: _bt_kill_batch not implemented */
+
+	/*
+	 * XXX maybe we should have a separate callback for this, and call it from
+	 * the indexam.c directly whenever we think it's appropriate? And not only
+	 * from here when freeing the batch?
+	 */
+	_bt_killitems_batch(scan, batch);
+}
+
+/*
+ *	_bt_readpage() -- Load data from current index page into so->currPos
+ *
+ * Caller must have pinned and read-locked so->currPos.buf; the buffer's state
+ * is not changed here.  Also, currPos.moreLeft and moreRight must be valid;
+ * they are updated as appropriate.  All other fields of so->currPos are
+ * initialized from scratch here.
+ *
+ * We scan the current page starting at offnum and moving in the indicated
+ * direction.  All items matching the scan keys are loaded into currPos.items.
+ * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
+ * that there can be no more matching tuples in the current scan direction
+ * (could just be for the current primitive index scan when scan has arrays).
+ *
+ * In the case of a parallel scan, caller must have called _bt_parallel_seize
+ * prior to calling this function; this function will invoke
+ * _bt_parallel_release before returning.
+ *
+ * Returns true if any matching items found on the page, false if none.
+ */
+static bool
+_bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
+			 bool firstpage)
+{
+	Relation	rel = scan->indexRelation;
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	Page		page;
+	BTPageOpaque opaque;
+	OffsetNumber minoff;
+	OffsetNumber maxoff;
+	BTReadPageState pstate;
+	bool		arrayKeys;
+	int			itemIndex,
+				indnatts;
+
+	/* save the page/buffer block number, along with its sibling links */
+	page = BufferGetPage(so->currPos.buf);
+	opaque = BTPageGetOpaque(page);
+	so->currPos.currPage = BufferGetBlockNumber(so->currPos.buf);
+	so->currPos.prevPage = opaque->btpo_prev;
+	so->currPos.nextPage = opaque->btpo_next;
+
+	Assert(!P_IGNORE(opaque));
+	Assert(BTScanPosIsPinned(so->currPos));
+	Assert(!so->needPrimScan);
+
+	if (scan->parallel_scan)
+	{
+		/* allow next/prev page to be read by other worker without delay */
+		if (ScanDirectionIsForward(dir))
+			_bt_parallel_release(scan, so->currPos.nextPage,
+								 so->currPos.currPage);
+		else
+			_bt_parallel_release(scan, so->currPos.prevPage,
+								 so->currPos.currPage);
+	}
+
+	/* initialize remaining currPos fields related to current page */
+	so->currPos.lsn = BufferGetLSNAtomic(so->currPos.buf);
+	so->currPos.dir = dir;
+	so->currPos.nextTupleOffset = 0;
+	/* either moreLeft or moreRight should be set now (may be unset later) */
+	Assert(ScanDirectionIsForward(dir) ? so->currPos.moreRight :
+		   so->currPos.moreLeft);
+
+	PredicateLockPage(rel, so->currPos.currPage, scan->xs_snapshot);
+
+	/* initialize local variables */
+	indnatts = IndexRelationGetNumberOfAttributes(rel);
+	arrayKeys = so->numArrayKeys != 0;
+	minoff = P_FIRSTDATAKEY(opaque);
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	/* initialize page-level state that we'll pass to _bt_checkkeys */
+	pstate.minoff = minoff;
+	pstate.maxoff = maxoff;
+	pstate.finaltup = NULL;
+	pstate.page = page;
+	pstate.firstpage = firstpage;
+	pstate.forcenonrequired = false;
+	pstate.startikey = 0;
+	pstate.offnum = InvalidOffsetNumber;
+	pstate.skip = InvalidOffsetNumber;
+	pstate.continuescan = true; /* default assumption */
+	pstate.rechecks = 0;
+	pstate.targetdistance = 0;
+	pstate.nskipadvances = 0;
+
+	if (ScanDirectionIsForward(dir))
+	{
+		/* SK_SEARCHARRAY forward scans must provide high key up front */
+		if (arrayKeys)
+		{
+			if (!P_RIGHTMOST(opaque))
+			{
+				ItemId		iid = PageGetItemId(page, P_HIKEY);
+
+				pstate.finaltup = (IndexTuple) PageGetItem(page, iid);
+
+				if (so->scanBehind &&
+					!_bt_scanbehind_checkkeys(scan, dir, pstate.finaltup))
+				{
+					/* Schedule another primitive index scan after all */
+					so->currPos.moreRight = false;
+					so->needPrimScan = true;
+					if (scan->parallel_scan)
+						_bt_parallel_primscan_schedule(scan,
+													   so->currPos.currPage);
+					return false;
+				}
+			}
+
+			so->scanBehind = so->oppositeDirCheck = false;	/* reset */
+		}
+
+		/*
+		 * Consider pstate.startikey optimization once the ongoing primitive
+		 * index scan has already read at least one page
+		 */
+		if (!pstate.firstpage && minoff < maxoff)
+			_bt_set_startikey(scan, &pstate);
+
+		/* load items[] in ascending order */
+		itemIndex = 0;
+
+		offnum = Max(offnum, minoff);
+
+		while (offnum <= maxoff)
+		{
+			ItemId		iid = PageGetItemId(page, offnum);
+			IndexTuple	itup;
+			bool		passes_quals;
+
+			/*
+			 * If the scan specifies not to return killed tuples, then we
+			 * treat a killed tuple as not passing the qual
+			 */
+			if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
+			{
+				offnum = OffsetNumberNext(offnum);
+				continue;
+			}
+
+			itup = (IndexTuple) PageGetItem(page, iid);
+			Assert(!BTreeTupleIsPivot(itup));
+
+			pstate.offnum = offnum;
+			passes_quals = _bt_checkkeys(scan, &pstate, arrayKeys,
+										 itup, indnatts);
+
+			/*
+			 * Check if we need to skip ahead to a later tuple (only possible
+			 * when the scan uses array keys)
+			 */
+			if (arrayKeys && OffsetNumberIsValid(pstate.skip))
+			{
+				Assert(!passes_quals && pstate.continuescan);
+				Assert(offnum < pstate.skip);
+				Assert(!pstate.forcenonrequired);
+
+				offnum = pstate.skip;
+				pstate.skip = InvalidOffsetNumber;
+				continue;
+			}
+
+			if (passes_quals)
+			{
+				/* tuple passes all scan key conditions */
+				if (!BTreeTupleIsPosting(itup))
+				{
+					/* Remember it */
+					_bt_saveitem(so, itemIndex, offnum, itup);
+					itemIndex++;
+				}
+				else
+				{
+					int			tupleOffset;
+
+					/*
+					 * Set up state to return posting list, and remember first
+					 * TID
+					 */
+					tupleOffset =
+						_bt_setuppostingitems(so, itemIndex, offnum,
+											  BTreeTupleGetPostingN(itup, 0),
+											  itup);
+					itemIndex++;
+					/* Remember additional TIDs */
+					for (int i = 1; i < BTreeTupleGetNPosting(itup); i++)
+					{
+						_bt_savepostingitem(so, itemIndex, offnum,
+											BTreeTupleGetPostingN(itup, i),
+											tupleOffset);
+						itemIndex++;
+					}
+				}
+			}
+			/* When !continuescan, there can't be any more matches, so stop */
+			if (!pstate.continuescan)
+				break;
+
+			offnum = OffsetNumberNext(offnum);
+		}
+
+		/*
+		 * We don't need to visit page to the right when the high key
+		 * indicates that no more matches will be found there.
+		 *
+		 * Checking the high key like this works out more often than you might
+		 * think.  Leaf page splits pick a split point between the two most
+		 * dissimilar tuples (this is weighed against the need to evenly share
+		 * free space).  Leaf pages with high key attribute values that can
+		 * only appear on non-pivot tuples on the right sibling page are
+		 * common.
+		 */
+		if (pstate.continuescan && !so->scanBehind && !P_RIGHTMOST(opaque))
+		{
+			ItemId		iid = PageGetItemId(page, P_HIKEY);
+			IndexTuple	itup = (IndexTuple) PageGetItem(page, iid);
+			int			truncatt;
+
+			truncatt = BTreeTupleGetNAtts(itup, rel);
+			pstate.forcenonrequired = false;
+			pstate.startikey = 0;	/* _bt_set_startikey ignores P_HIKEY */
+			_bt_checkkeys(scan, &pstate, arrayKeys, itup, truncatt);
+		}
+
+		if (!pstate.continuescan)
+			so->currPos.moreRight = false;
+
+		Assert(itemIndex <= MaxTIDsPerBTreePage);
+		so->currPos.firstItem = 0;
+		so->currPos.lastItem = itemIndex - 1;
+		so->currPos.itemIndex = 0;
+	}
+	else
+	{
+		/* SK_SEARCHARRAY backward scans must provide final tuple up front */
+		if (arrayKeys)
+		{
+			if (minoff <= maxoff && !P_LEFTMOST(opaque))
+			{
+				ItemId		iid = PageGetItemId(page, minoff);
+
+				pstate.finaltup = (IndexTuple) PageGetItem(page, iid);
+
+				if (so->scanBehind &&
+					!_bt_scanbehind_checkkeys(scan, dir, pstate.finaltup))
+				{
+					/* Schedule another primitive index scan after all */
+					so->currPos.moreLeft = false;
+					so->needPrimScan = true;
+					if (scan->parallel_scan)
+						_bt_parallel_primscan_schedule(scan,
+													   so->currPos.currPage);
+					return false;
+				}
+			}
+
+			so->scanBehind = so->oppositeDirCheck = false;	/* reset */
+		}
+
+		/*
+		 * Consider pstate.startikey optimization once the ongoing primitive
+		 * index scan has already read at least one page
+		 */
+		if (!pstate.firstpage && minoff < maxoff)
+			_bt_set_startikey(scan, &pstate);
+
+		/* load items[] in descending order */
+		itemIndex = MaxTIDsPerBTreePage;
+
+		offnum = Min(offnum, maxoff);
+
+		while (offnum >= minoff)
+		{
+			ItemId		iid = PageGetItemId(page, offnum);
+			IndexTuple	itup;
+			bool		tuple_alive;
+			bool		passes_quals;
+
+			/*
+			 * If the scan specifies not to return killed tuples, then we
+			 * treat a killed tuple as not passing the qual.  Most of the
+			 * time, it's a win to not bother examining the tuple's index
+			 * keys, but just skip to the next tuple (previous, actually,
+			 * since we're scanning backwards).  However, if this is the first
+			 * tuple on the page, we do check the index keys, to prevent
+			 * uselessly advancing to the page to the left.  This is similar
+			 * to the high key optimization used by forward scans.
+			 */
+			if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
+			{
+				if (offnum > minoff)
+				{
+					offnum = OffsetNumberPrev(offnum);
+					continue;
+				}
+
+				tuple_alive = false;
+			}
+			else
+				tuple_alive = true;
+
+			itup = (IndexTuple) PageGetItem(page, iid);
+			Assert(!BTreeTupleIsPivot(itup));
+
+			pstate.offnum = offnum;
+			if (arrayKeys && offnum == minoff && pstate.forcenonrequired)
+			{
+				pstate.forcenonrequired = false;
+				pstate.startikey = 0;
+			}
+			passes_quals = _bt_checkkeys(scan, &pstate, arrayKeys,
+										 itup, indnatts);
+
+			if (arrayKeys && so->scanBehind)
+			{
+				/*
+				 * Done scanning this page, but not done with the current
+				 * primscan.
+				 *
+				 * Note: Forward scans don't check this explicitly, since they
+				 * prefer to reuse pstate.skip for this instead.
+				 */
+				Assert(!passes_quals && pstate.continuescan);
+				Assert(!pstate.forcenonrequired);
+
+				break;
+			}
+
+			/*
+			 * Check if we need to skip ahead to a later tuple (only possible
+			 * when the scan uses array keys)
+			 */
+			if (arrayKeys && OffsetNumberIsValid(pstate.skip))
+			{
+				Assert(!passes_quals && pstate.continuescan);
+				Assert(offnum > pstate.skip);
+				Assert(!pstate.forcenonrequired);
+
+				offnum = pstate.skip;
+				pstate.skip = InvalidOffsetNumber;
+				continue;
+			}
+
+			if (passes_quals && tuple_alive)
+			{
+				/* tuple passes all scan key conditions */
+				if (!BTreeTupleIsPosting(itup))
+				{
+					/* Remember it */
+					itemIndex--;
+					_bt_saveitem(so, itemIndex, offnum, itup);
+				}
+				else
+				{
+					int			tupleOffset;
+
+					/*
+					 * Set up state to return posting list, and remember first
+					 * TID.
+					 *
+					 * Note that we deliberately save/return items from
+					 * posting lists in ascending heap TID order for backwards
+					 * scans.  This allows _bt_killitems() to make a
+					 * consistent assumption about the order of items
+					 * associated with the same posting list tuple.
+					 */
+					itemIndex--;
+					tupleOffset =
+						_bt_setuppostingitems(so, itemIndex, offnum,
+											  BTreeTupleGetPostingN(itup, 0),
+											  itup);
+					/* Remember additional TIDs */
+					for (int i = 1; i < BTreeTupleGetNPosting(itup); i++)
+					{
+						itemIndex--;
+						_bt_savepostingitem(so, itemIndex, offnum,
+											BTreeTupleGetPostingN(itup, i),
+											tupleOffset);
+					}
+				}
+			}
+			/* When !continuescan, there can't be any more matches, so stop */
+			if (!pstate.continuescan)
+				break;
+
+			offnum = OffsetNumberPrev(offnum);
+		}
+
+		/*
+		 * We don't need to visit page to the left when no more matches will
+		 * be found there
+		 */
+		if (!pstate.continuescan)
+			so->currPos.moreLeft = false;
+
+		Assert(itemIndex >= 0);
+		so->currPos.firstItem = itemIndex;
+		so->currPos.lastItem = MaxTIDsPerBTreePage - 1;
+		so->currPos.itemIndex = MaxTIDsPerBTreePage - 1;
+	}
+
+	/*
+	 * If _bt_set_startikey told us to temporarily treat the scan's keys as
+	 * nonrequired (possible only during scans with array keys), there must be
+	 * no lasting consequences for the scan's array keys.  The scan's arrays
+	 * should now have exactly the same elements as they would have had if the
+	 * nonrequired behavior had never been used.  (In general, a scan's arrays
+	 * are expected to track its progress through the index's key space.)
+	 *
+	 * We are required (by _bt_set_startikey) to call _bt_checkkeys against
+	 * pstate.finaltup with pstate.forcenonrequired=false to allow the scan's
+	 * arrays to recover.  Assert that that step hasn't been missed.
+	 */
+	Assert(!pstate.forcenonrequired);
+
+	return (so->currPos.firstItem <= so->currPos.lastItem);
+}
+
+static IndexScanBatch
+_bt_readpage_batch(IndexScanDesc scan, BTBatchScanPos pos, ScanDirection dir, OffsetNumber offnum,
+				   bool firstpage)
+{
+	Relation	rel = scan->indexRelation;
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	Page		page;
+	BTPageOpaque opaque;
+	OffsetNumber minoff;
+	OffsetNumber maxoff;
+	BTReadPageState pstate;
+	bool		arrayKeys;
+	int			itemIndex,
+				indnatts;
+
+	/* result */
+	/* IndexScanBatch batch = ddd; */
+	IndexScanBatch batch;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/*
+	 * FIXME fake for _bt_checkkeys, needs to be set properly elsewhere (not
+	 * sure where)
+	 */
+
+	/*
+	 * XXX we shouldn't be passing this info through currPos but directly, I
+	 * guess.
+	 */
+	so->currPos.dir = dir;
+
+	/*
+	 * XXX We can pass the exact number if items from this page, by using
+	 * maxoff
+	 */
+	batch = index_batch_alloc(MaxTIDsPerBTreePage, scan->xs_want_itup);
+
+	/* FIXME but we don't copy the contents until the end */
+	batch->opaque = palloc0(sizeof(BTBatchScanPosData));
+
+	/* bogus values */
+	batch->firstItem = -1;
+	batch->lastItem = -1;
+	batch->itemIndex = -1;
+
+	/* if (so->currTuples) */
+	/* { */
+	/* batch->currTuples = (char *) palloc(BLCKSZ); */
+	/* memcpy(batch->currTuples, so->currTuples, BLCKSZ); */
+	/* } */
+
+	/* save the page/buffer block number, along with its sibling links */
+	page = BufferGetPage(pos->buf);
+	opaque = BTPageGetOpaque(page);
+	pos->currPage = BufferGetBlockNumber(pos->buf);
+	pos->prevPage = opaque->btpo_prev;
+	pos->nextPage = opaque->btpo_next;
+
+	Assert(!P_IGNORE(opaque));
+	Assert(BTBatchScanPosIsPinned(*pos));
+	Assert(!so->needPrimScan);
+
+	if (scan->parallel_scan)
+	{
+		/* allow next/prev page to be read by other worker without delay */
+		if (ScanDirectionIsForward(dir))
+			_bt_parallel_release(scan, pos->nextPage,
+								 pos->currPage);
+		else
+			_bt_parallel_release(scan, pos->prevPage,
+								 pos->currPage);
+	}
+
+	/* initialize remaining currPos fields related to current page */
+	pos->lsn = BufferGetLSNAtomic(pos->buf);
+	pos->dir = dir;
+	pos->nextTupleOffset = 0;
+	/* either moreLeft or moreRight should be set now (may be unset later) */
+	Assert(ScanDirectionIsForward(dir) ? pos->moreRight : pos->moreLeft);
+
+	PredicateLockPage(rel, pos->currPage, scan->xs_snapshot);
+
+	/* initialize local variables */
+	indnatts = IndexRelationGetNumberOfAttributes(rel);
+	arrayKeys = so->numArrayKeys != 0;
+	minoff = P_FIRSTDATAKEY(opaque);
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	/* initialize page-level state that we'll pass to _bt_checkkeys */
+	pstate.minoff = minoff;
+	pstate.maxoff = maxoff;
+	pstate.finaltup = NULL;
+	pstate.page = page;
+	pstate.firstpage = firstpage;
+	pstate.forcenonrequired = false;
+	pstate.startikey = 0;
+	pstate.offnum = InvalidOffsetNumber;
+	pstate.skip = InvalidOffsetNumber;
+	pstate.continuescan = true; /* default assumption */
+	pstate.rechecks = 0;
+	pstate.targetdistance = 0;
+	pstate.nskipadvances = 0;
+
+	if (ScanDirectionIsForward(dir))
+	{
+		/* SK_SEARCHARRAY forward scans must provide high key up front */
+		if (arrayKeys)
+		{
+			if (!P_RIGHTMOST(opaque))
+			{
+				ItemId		iid = PageGetItemId(page, P_HIKEY);
+
+				pstate.finaltup = (IndexTuple) PageGetItem(page, iid);
+
+				if (so->scanBehind &&
+					!_bt_scanbehind_checkkeys(scan, dir, pstate.finaltup))
+				{
+					/* Schedule another primitive index scan after all */
+					pos->moreRight = false;
+					so->needPrimScan = true;
+					if (scan->parallel_scan)
+						_bt_parallel_primscan_schedule(scan,
+													   pos->currPage);
+					return NULL;
+				}
+			}
+
+			so->scanBehind = so->oppositeDirCheck = false;	/* reset */
+		}
+
+		/*
+		 * Consider pstate.startikey optimization once the ongoing primitive
+		 * index scan has already read at least one page
+		 */
+		if (!pstate.firstpage && minoff < maxoff)
+			_bt_set_startikey(scan, &pstate);
+
+		/* load items[] in ascending order */
+		itemIndex = 0;
+
+		offnum = Max(offnum, minoff);
+
+		while (offnum <= maxoff)
+		{
+			ItemId		iid = PageGetItemId(page, offnum);
+			IndexTuple	itup;
 			bool		passes_quals;
 
 			/*
@@ -1740,7 +2994,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 				if (!BTreeTupleIsPosting(itup))
 				{
 					/* Remember it */
-					_bt_saveitem(so, itemIndex, offnum, itup);
+					_bt_saveitem_batch(batch, itemIndex, offnum, itup);
 					itemIndex++;
 				}
 				else
@@ -1752,16 +3006,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 					 * TID
 					 */
 					tupleOffset =
-						_bt_setuppostingitems(so, itemIndex, offnum,
-											  BTreeTupleGetPostingN(itup, 0),
-											  itup);
+						_bt_setuppostingitems_batch(batch, itemIndex, offnum,
+													BTreeTupleGetPostingN(itup, 0),
+													itup);
 					itemIndex++;
 					/* Remember additional TIDs */
 					for (int i = 1; i < BTreeTupleGetNPosting(itup); i++)
 					{
-						_bt_savepostingitem(so, itemIndex, offnum,
-											BTreeTupleGetPostingN(itup, i),
-											tupleOffset);
+						_bt_savepostingitem_batch(batch, itemIndex, offnum,
+												  BTreeTupleGetPostingN(itup, i),
+												  tupleOffset);
 						itemIndex++;
 					}
 				}
@@ -1792,17 +3046,17 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 
 			truncatt = BTreeTupleGetNAtts(itup, rel);
 			pstate.forcenonrequired = false;
-			pstate.startikey = 0;	/* _bt_set_startikey ignores P_HIKEY */
+			pstate.startikey = 0;	/* _bt_set_startikey ignores HIKEY */
 			_bt_checkkeys(scan, &pstate, arrayKeys, itup, truncatt);
 		}
 
 		if (!pstate.continuescan)
-			so->currPos.moreRight = false;
+			pos->moreRight = false;
 
 		Assert(itemIndex <= MaxTIDsPerBTreePage);
-		so->currPos.firstItem = 0;
-		so->currPos.lastItem = itemIndex - 1;
-		so->currPos.itemIndex = 0;
+		batch->firstItem = 0;
+		batch->lastItem = itemIndex - 1;
+		batch->itemIndex = 0;
 	}
 	else
 	{
@@ -1819,12 +3073,12 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 					!_bt_scanbehind_checkkeys(scan, dir, pstate.finaltup))
 				{
 					/* Schedule another primitive index scan after all */
-					so->currPos.moreLeft = false;
+					pos->moreLeft = false;
 					so->needPrimScan = true;
 					if (scan->parallel_scan)
 						_bt_parallel_primscan_schedule(scan,
-													   so->currPos.currPage);
-					return false;
+													   pos->currPage);
+					return NULL;
 				}
 			}
 
@@ -1922,7 +3176,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 				{
 					/* Remember it */
 					itemIndex--;
-					_bt_saveitem(so, itemIndex, offnum, itup);
+					_bt_saveitem_batch(batch, itemIndex, offnum, itup);
 				}
 				else
 				{
@@ -1940,16 +3194,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 					 */
 					itemIndex--;
 					tupleOffset =
-						_bt_setuppostingitems(so, itemIndex, offnum,
-											  BTreeTupleGetPostingN(itup, 0),
-											  itup);
+						_bt_setuppostingitems_batch(batch, itemIndex, offnum,
+													BTreeTupleGetPostingN(itup, 0),
+													itup);
 					/* Remember additional TIDs */
 					for (int i = 1; i < BTreeTupleGetNPosting(itup); i++)
 					{
 						itemIndex--;
-						_bt_savepostingitem(so, itemIndex, offnum,
-											BTreeTupleGetPostingN(itup, i),
-											tupleOffset);
+						_bt_savepostingitem_batch(batch, itemIndex, offnum,
+												  BTreeTupleGetPostingN(itup, i),
+												  tupleOffset);
 					}
 				}
 			}
@@ -1965,12 +3219,12 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 		 * be found there
 		 */
 		if (!pstate.continuescan)
-			so->currPos.moreLeft = false;
+			pos->moreLeft = false;
 
 		Assert(itemIndex >= 0);
-		so->currPos.firstItem = itemIndex;
-		so->currPos.lastItem = MaxTIDsPerBTreePage - 1;
-		so->currPos.itemIndex = MaxTIDsPerBTreePage - 1;
+		batch->firstItem = itemIndex;
+		batch->lastItem = MaxTIDsPerBTreePage - 1;
+		batch->itemIndex = MaxTIDsPerBTreePage - 1;
 	}
 
 	/*
@@ -1987,7 +3241,12 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum,
 	 */
 	Assert(!pstate.forcenonrequired);
 
-	return (so->currPos.firstItem <= so->currPos.lastItem);
+	if (batch->firstItem > batch->lastItem)
+		return NULL;
+
+	memcpy(batch->opaque, pos, sizeof(BTBatchScanPosData));
+
+	return batch;
 }
 
 /* Save an index item into so->currPos.items[itemIndex] */
@@ -2005,9 +3264,97 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
 	{
 		Size		itupsz = IndexTupleSize(itup);
 
-		currItem->tupleOffset = so->currPos.nextTupleOffset;
-		memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
-		so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+		currItem->tupleOffset = so->currPos.nextTupleOffset;
+		memcpy(so->currTuples + so->currPos.nextTupleOffset, itup, itupsz);
+		so->currPos.nextTupleOffset += MAXALIGN(itupsz);
+	}
+}
+
+/*
+ * Setup state to save TIDs/items from a single posting list tuple.
+ *
+ * Saves an index item into so->currPos.items[itemIndex] for TID that is
+ * returned to scan first.  Second or subsequent TIDs for posting list should
+ * be saved by calling _bt_savepostingitem().
+ *
+ * Returns an offset into tuple storage space that main tuple is stored at if
+ * needed.
+ */
+static int
+_bt_setuppostingitems(BTScanOpaque so, int itemIndex, OffsetNumber offnum,
+					  ItemPointer heapTid, IndexTuple itup)
+{
+	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+	Assert(BTreeTupleIsPosting(itup));
+
+	currItem->heapTid = *heapTid;
+	currItem->indexOffset = offnum;
+	if (so->currTuples)
+	{
+		/* Save base IndexTuple (truncate posting list) */
+		IndexTuple	base;
+		Size		itupsz = BTreeTupleGetPostingOffset(itup);
+
+		itupsz = MAXALIGN(itupsz);
+		currItem->tupleOffset = so->currPos.nextTupleOffset;
+		base = (IndexTuple) (so->currTuples + so->currPos.nextTupleOffset);
+		memcpy(base, itup, itupsz);
+		/* Defensively reduce work area index tuple header size */
+		base->t_info &= ~INDEX_SIZE_MASK;
+		base->t_info |= itupsz;
+		so->currPos.nextTupleOffset += itupsz;
+
+		return currItem->tupleOffset;
+	}
+
+	return 0;
+}
+
+/*
+ * Save an index item into so->currPos.items[itemIndex] for current posting
+ * tuple.
+ *
+ * Assumes that _bt_setuppostingitems() has already been called for current
+ * posting list tuple.  Caller passes its return value as tupleOffset.
+ */
+static inline void
+_bt_savepostingitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum,
+					ItemPointer heapTid, int tupleOffset)
+{
+	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+
+	currItem->heapTid = *heapTid;
+	currItem->indexOffset = offnum;
+
+	/*
+	 * Have index-only scans return the same base IndexTuple for every TID
+	 * that originates from the same posting list
+	 */
+	if (so->currTuples)
+		currItem->tupleOffset = tupleOffset;
+}
+
+/* Save an index item into so->currPos.items[itemIndex] */
+static void
+_bt_saveitem_batch(IndexScanBatch batch, int itemIndex,
+				   OffsetNumber offnum, IndexTuple itup)
+{
+	BTBatchScanPos pos = (BTBatchScanPos) batch->opaque;
+
+	Assert(!BTreeTupleIsPivot(itup) && !BTreeTupleIsPosting(itup));
+
+	/* copy the populated part of the items array */
+	batch->items[itemIndex].heapTid = itup->t_tid;
+	batch->items[itemIndex].indexOffset = offnum;
+
+	if (batch->currTuples)
+	{
+		Size		itupsz = IndexTupleSize(itup);
+
+		batch->items[itemIndex].tupleOffset = pos->nextTupleOffset;
+		memcpy(batch->currTuples + pos->nextTupleOffset, itup, itupsz);
+		pos->nextTupleOffset += MAXALIGN(itupsz);
 	}
 }
 
@@ -2022,31 +3369,34 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
  * needed.
  */
 static int
-_bt_setuppostingitems(BTScanOpaque so, int itemIndex, OffsetNumber offnum,
-					  ItemPointer heapTid, IndexTuple itup)
+_bt_setuppostingitems_batch(IndexScanBatch batch, int itemIndex, OffsetNumber offnum,
+							ItemPointer heapTid, IndexTuple itup)
 {
-	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+	BTBatchScanPos pos = (BTBatchScanPos) batch->opaque;
+	IndexScanBatchPosItem *item = &batch->items[itemIndex];
 
 	Assert(BTreeTupleIsPosting(itup));
 
-	currItem->heapTid = *heapTid;
-	currItem->indexOffset = offnum;
-	if (so->currTuples)
+	/* copy the populated part of the items array */
+	item->heapTid = *heapTid;
+	item->indexOffset = offnum;
+
+	if (batch->currTuples)
 	{
 		/* Save base IndexTuple (truncate posting list) */
 		IndexTuple	base;
 		Size		itupsz = BTreeTupleGetPostingOffset(itup);
 
 		itupsz = MAXALIGN(itupsz);
-		currItem->tupleOffset = so->currPos.nextTupleOffset;
-		base = (IndexTuple) (so->currTuples + so->currPos.nextTupleOffset);
+		item->tupleOffset = pos->nextTupleOffset;
+		base = (IndexTuple) (batch->currTuples + pos->nextTupleOffset);
 		memcpy(base, itup, itupsz);
 		/* Defensively reduce work area index tuple header size */
 		base->t_info &= ~INDEX_SIZE_MASK;
 		base->t_info |= itupsz;
-		so->currPos.nextTupleOffset += itupsz;
+		pos->nextTupleOffset += itupsz;
 
-		return currItem->tupleOffset;
+		return item->tupleOffset;
 	}
 
 	return 0;
@@ -2060,20 +3410,20 @@ _bt_setuppostingitems(BTScanOpaque so, int itemIndex, OffsetNumber offnum,
  * posting list tuple.  Caller passes its return value as tupleOffset.
  */
 static inline void
-_bt_savepostingitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum,
-					ItemPointer heapTid, int tupleOffset)
+_bt_savepostingitem_batch(IndexScanBatch batch, int itemIndex, OffsetNumber offnum,
+						  ItemPointer heapTid, int tupleOffset)
 {
-	BTScanPosItem *currItem = &so->currPos.items[itemIndex];
+	IndexScanBatchPosItem *item = &batch->items[itemIndex];
 
-	currItem->heapTid = *heapTid;
-	currItem->indexOffset = offnum;
+	item->heapTid = *heapTid;
+	item->indexOffset = offnum;
 
 	/*
 	 * Have index-only scans return the same base IndexTuple for every TID
 	 * that originates from the same posting list
 	 */
-	if (so->currTuples)
-		currItem->tupleOffset = tupleOffset;
+	if (batch->currTuples)
+		item->tupleOffset = tupleOffset;
 }
 
 /*
@@ -2186,6 +3536,71 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 	return _bt_readnextpage(scan, blkno, lastcurrblkno, dir, false);
 }
 
+/*
+ *	a batching version of _bt_steppage(), ignoring irrelevant bits
+ */
+static IndexScanBatch
+_bt_steppage_batch(IndexScanDesc scan, BTBatchScanPos pos, ScanDirection dir)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	BlockNumber blkno,
+				lastcurrblkno;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	/* Batching has a different concept of position, stored in the batch. */
+	Assert(BTBatchScanPosIsValid(*pos));
+
+	/*
+	 * killitems
+	 *
+	 * No need to handle killtuples here, that's going to be dealt with at the
+	 * indexam.c level when freeing the batch, or possibly in when calling
+	 * amfreebatch.
+	 */
+
+	/*
+	 * mark/restore
+	 *
+	 * Mark/restore shall also be handled at the indexam.c level, by keeping
+	 * the correct batch around, etc. We don't discard the old batch here.
+	 *
+	 * In _bt_steppage this also handled primitive scans for array keys, but
+	 * that probably would be handled at indexam.c level too.
+	 */
+
+	/* Don't unpin the buffer here, keep the batch pinned until amfreebatch. */
+
+	/* Walk to the next page with data */
+	if (ScanDirectionIsForward(dir))
+		blkno = pos->nextPage;
+	else
+		blkno = pos->prevPage;
+
+	lastcurrblkno = pos->currPage;
+
+	/*
+	 * Cancel primitive index scans that were scheduled when the call to
+	 * _bt_readpage for currPos happened to use the opposite direction to the
+	 * one that we're stepping in now.  (It's okay to leave the scan's array
+	 * keys as-is, since the next _bt_readpage will advance them.)
+	 *
+	 * XXX Not sure this is correct. Can we combine the direction from some
+	 * older batch (with mark/restore?) and the current needPrimScan from the
+	 * latest batch we processed? But, the mark/restore code in indexam should
+	 * reset this somehow.
+	 *
+	 * XXX However, aren't primitive scans very btree-specific code? How could
+	 * indexam.c ever handle that?
+	 */
+	if (pos->dir != dir)
+		so->needPrimScan = false;
+
+	return _bt_readnextpage_batch(scan, pos, blkno, lastcurrblkno, dir, false);
+}
+
 /*
  *	_bt_readfirstpage() -- Read first page containing valid data for _bt_first
  *
@@ -2265,6 +3680,77 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir)
 	return true;
 }
 
+static IndexScanBatch
+_bt_readfirstpage_batch(IndexScanDesc scan, BTBatchScanPos pos, OffsetNumber offnum, ScanDirection dir)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	IndexScanBatch batch;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	so->numKilled = 0;			/* just paranoia */
+	so->markItemIndex = -1;		/* ditto */
+
+	/* copy position info from BTScanOpaque */
+
+	/* Initialize so->currPos for the first page (page in so->currPos.buf) */
+	if (so->needPrimScan)
+	{
+		Assert(so->numArrayKeys);
+
+		pos->moreLeft = true;
+		pos->moreRight = true;
+		so->needPrimScan = false;
+	}
+	else if (ScanDirectionIsForward(dir))
+	{
+		pos->moreLeft = false;
+		pos->moreRight = true;
+	}
+	else
+	{
+		pos->moreLeft = true;
+		pos->moreRight = false;
+	}
+
+	/*
+	 * Attempt to load matching tuples from the first page.
+	 *
+	 * Note that _bt_readpage will finish initializing the so->currPos fields.
+	 * _bt_readpage also releases parallel scan (even when it returns false).
+	 */
+	if ((batch = _bt_readpage_batch(scan, pos, dir, offnum, true)) != NULL)
+	{
+		pos = (BTBatchScanPos) batch->opaque;
+
+		/*
+		 * _bt_readpage succeeded.  Drop the lock (and maybe the pin) on
+		 * so->currPos.buf in preparation for btgettuple returning tuples.
+		 */
+		Assert(BTBatchScanPosIsPinned(*pos));
+
+		/* _bt_drop_lock_and_maybe_pin_batch(scan, pos); */
+		/* XXX drop just the lock, not the pin, that's up to btfreebatch */
+		/* without this btfreebatch triggers an assert when unpinning the */
+		/* buffer, because that checks we're not holding a lock on it */
+		_bt_unlockbuf(scan->indexRelation, pos->buf);
+		return batch;
+	}
+
+	/* There's no actually-matching data on the page in so->currPos.buf */
+	_bt_unlockbuf(scan->indexRelation, pos->buf);
+
+	/* XXX Not sure we can drop the pin before calling steppage_batch? But */
+	/* without this, \d+ reports unreleased buffer ... */
+	/* And the non-batch code doesn't need to do this. */
+	ReleaseBuffer(pos->buf);
+
+	/* Call _bt_readnextpage using its _bt_steppage wrapper function */
+	return _bt_steppage_batch(scan, pos, dir);
+}
+
 /*
  *	_bt_readnextpage() -- Read next page containing valid data for _bt_next
  *
@@ -2412,6 +3898,138 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno,
 	return true;
 }
 
+static IndexScanBatch
+_bt_readnextpage_batch(IndexScanDesc scan, BTBatchScanPos pos, BlockNumber blkno,
+					   BlockNumber lastcurrblkno, ScanDirection dir, bool seized)
+{
+	Relation	rel = scan->indexRelation;
+	BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque) scan->opaque;
+
+	/* BTBatchScanPosData	newpos; */
+	IndexScanBatch newbatch = NULL;
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	Assert(pos->currPage == lastcurrblkno || seized);
+	Assert(BTBatchScanPosIsPinned(*pos) || seized);
+
+	/* initialize the new position to the old one, we'll modify it */
+	/* newpos = *pos; */
+
+	/* pos->moreLeft = pos->moreRight = false; */
+
+	/*
+	 * Remember that the scan already read lastcurrblkno, a page to the left
+	 * of blkno (or remember reading a page to the right, for backwards scans)
+	 */
+	if (ScanDirectionIsForward(dir))
+		pos->moreLeft = true;
+	else
+		pos->moreRight = true;
+
+	for (;;)
+	{
+		Page		page;
+		BTPageOpaque opaque;
+
+		if (blkno == P_NONE ||
+			(ScanDirectionIsForward(dir) ?
+			 !pos->moreRight : !pos->moreLeft))
+		{
+			/* most recent _bt_readpage call (for lastcurrblkno) ended scan */
+			Assert(pos->currPage == lastcurrblkno && !seized);
+			BTBatchScanPosInvalidate(*pos);
+			_bt_parallel_done(scan);	/* iff !so->needPrimScan */
+			return NULL;
+		}
+
+		Assert(!so->needPrimScan);
+
+		/* parallel scan must never actually visit so->currPos blkno */
+		if (!seized && scan->parallel_scan != NULL &&
+			!_bt_parallel_seize_batch(scan, pos, &blkno, &lastcurrblkno, false))
+		{
+			/* whole scan is now done (or another primitive scan required) */
+			BTBatchScanPosInvalidate(*pos);
+			return NULL;
+		}
+
+		if (ScanDirectionIsForward(dir))
+		{
+			/* read blkno, but check for interrupts first */
+			CHECK_FOR_INTERRUPTS();
+			pos->buf = _bt_getbuf(rel, blkno, BT_READ);
+		}
+		else
+		{
+			/* read blkno, avoiding race (also checks for interrupts) */
+			pos->buf = _bt_lock_and_validate_left(rel, &blkno,
+												  lastcurrblkno);
+			if (pos->buf == InvalidBuffer)
+			{
+				/* must have been a concurrent deletion of leftmost page */
+				BTBatchScanPosInvalidate(*pos);
+				_bt_parallel_done(scan);
+				return NULL;
+			}
+		}
+
+		page = BufferGetPage(pos->buf);
+		opaque = BTPageGetOpaque(page);
+		lastcurrblkno = blkno;
+		if (likely(!P_IGNORE(opaque)))
+		{
+			/* see if there are any matches on this page */
+			if (ScanDirectionIsForward(dir))
+			{
+				/* note that this will clear moreRight if we can stop */
+				if ((newbatch = _bt_readpage_batch(scan, pos, dir, P_FIRSTDATAKEY(opaque), false)) != NULL)
+					break;
+				blkno = pos->nextPage;
+			}
+			else
+			{
+				/* note that this will clear moreLeft if we can stop */
+				if ((newbatch = _bt_readpage_batch(scan, pos, dir, PageGetMaxOffsetNumber(page), false)) != NULL)
+					break;
+				blkno = pos->prevPage;
+			}
+		}
+		else
+		{
+			/* _bt_readpage not called, so do all this for ourselves */
+			if (ScanDirectionIsForward(dir))
+				blkno = opaque->btpo_next;
+			else
+				blkno = opaque->btpo_prev;
+			if (scan->parallel_scan != NULL)
+				_bt_parallel_release(scan, blkno, lastcurrblkno);
+		}
+
+		/* no matching tuples on this page */
+		_bt_relbuf(rel, pos->buf);
+		seized = false;			/* released by _bt_readpage (or by us) */
+	}
+
+	/* */
+	Assert(newbatch != NULL);
+
+	pos = (BTBatchScanPos) newbatch->opaque;
+
+	/*
+	 * _bt_readpage succeeded.  Drop the lock (and maybe the pin) on
+	 * so->currPos.buf in preparation for btgettuple returning tuples.
+	 */
+	Assert(pos->currPage == blkno);
+	Assert(BTBatchScanPosIsPinned(*pos));
+	/* _bt_drop_lock_and_maybe_pin_batch(scan, pos); */
+	_bt_unlockbuf(scan->indexRelation, pos->buf);
+
+	return newbatch;
+}
+
 /*
  * _bt_lock_and_validate_left() -- lock caller's left sibling blkno,
  * recovering from concurrent page splits/page deletions when necessary
@@ -2693,3 +4311,79 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 	_bt_returnitem(scan, so);
 	return true;
 }
+
+/*
+ *	_bt_endpoint() -- Find the first or last page in the index, and scan
+ * from there to the first key satisfying all the quals.
+ *
+ * This is used by _bt_first() to set up a scan when we've determined
+ * that the scan must start at the beginning or end of the index (for
+ * a forward or backward scan respectively).
+ *
+ * Parallel scan callers must have seized the scan before calling here.
+ * Exit conditions are the same as for _bt_first().
+ */
+static IndexScanBatch
+_bt_endpoint_batch(IndexScanDesc scan, ScanDirection dir)
+{
+	Relation	rel = scan->indexRelation;
+	BTScanOpaque so PG_USED_FOR_ASSERTS_ONLY = (BTScanOpaque) scan->opaque;
+	Page		page;
+	BTPageOpaque opaque;
+	OffsetNumber start;
+	BTBatchScanPosData pos;
+
+	BTBatchScanPosInvalidate(pos);
+
+	/* batching does not work with regular scan-level positions */
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!BTScanPosIsValid(so->markPos));
+
+	Assert(!BTScanPosIsValid(so->currPos));
+	Assert(!so->needPrimScan);
+
+	/*
+	 * Scan down to the leftmost or rightmost leaf page.  This is a simplified
+	 * version of _bt_search().
+	 */
+	pos.buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
+
+	if (!BufferIsValid(pos.buf))
+	{
+		/*
+		 * Empty index. Lock the whole relation, as nothing finer to lock
+		 * exists.
+		 */
+		PredicateLockRelation(rel, scan->xs_snapshot);
+		_bt_parallel_done(scan);
+		return false;
+	}
+
+	page = BufferGetPage(pos.buf);
+	opaque = BTPageGetOpaque(page);
+	Assert(P_ISLEAF(opaque));
+
+	if (ScanDirectionIsForward(dir))
+	{
+		/* There could be dead pages to the left, so not this: */
+		/* Assert(P_LEFTMOST(opaque)); */
+
+		start = P_FIRSTDATAKEY(opaque);
+	}
+	else if (ScanDirectionIsBackward(dir))
+	{
+		Assert(P_RIGHTMOST(opaque));
+
+		start = PageGetMaxOffsetNumber(page);
+	}
+	else
+	{
+		elog(ERROR, "invalid scan direction: %d", (int) dir);
+		start = 0;				/* keep compiler quiet */
+	}
+
+	/*
+	 * Now load data from the first page of the scan.
+	 */
+	return _bt_readfirstpage_batch(scan, &pos, start, dir);
+}
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 9e27302fe81..5f8818b1a3f 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -3481,6 +3481,185 @@ _bt_killitems(IndexScanDesc scan)
 	_bt_unlockbuf(scan->indexRelation, so->currPos.buf);
 }
 
+/*
+ * _bt_killitems_batch
+ *		a variant of _bt_killitems, using the batch-level killedItems
+ */
+void
+_bt_killitems_batch(IndexScanDesc scan, IndexScanBatch batch)
+{
+	/* BTScanOpaque so = (BTScanOpaque) scan->opaque; */
+	BTBatchScanPos pos = (BTBatchScanPos) batch->opaque;
+	Page		page;
+	BTPageOpaque opaque;
+	OffsetNumber minoff;
+	OffsetNumber maxoff;
+	int			i;
+	int			numKilled = batch->numKilled;
+	bool		killedsomething = false;
+	bool		droppedpin PG_USED_FOR_ASSERTS_ONLY;
+
+	Assert(BTBatchScanPosIsValid(*pos));
+
+	/*
+	 * Always reset the scan state, so we don't look for same items on other
+	 * pages.
+	 */
+	batch->numKilled = 0;
+
+	if (BTBatchScanPosIsPinned(*pos))
+	{
+		/*
+		 * We have held the pin on this page since we read the index tuples,
+		 * so all we need to do is lock it.  The pin will have prevented
+		 * re-use of any TID on the page, so there is no need to check the
+		 * LSN.
+		 */
+		droppedpin = false;
+		_bt_lockbuf(scan->indexRelation, pos->buf, BT_READ);
+
+		page = BufferGetPage(pos->buf);
+	}
+	else
+	{
+		Buffer		buf;
+
+		droppedpin = true;
+		/* Attempt to re-read the buffer, getting pin and lock. */
+		buf = _bt_getbuf(scan->indexRelation, pos->currPage, BT_READ);
+
+		page = BufferGetPage(buf);
+		if (BufferGetLSNAtomic(buf) == pos->lsn)
+			pos->buf = buf;
+		else
+		{
+			/* Modified while not pinned means hinting is not safe. */
+			_bt_relbuf(scan->indexRelation, buf);
+			return;
+		}
+	}
+
+	opaque = BTPageGetOpaque(page);
+	minoff = P_FIRSTDATAKEY(opaque);
+	maxoff = PageGetMaxOffsetNumber(page);
+
+	for (i = 0; i < numKilled; i++)
+	{
+		int			itemIndex = batch->killedItems[i];
+		IndexScanBatchPosItem *kitem = &batch->items[itemIndex];
+		OffsetNumber offnum = kitem->indexOffset;
+
+		Assert(itemIndex >= batch->firstItem &&
+			   itemIndex <= batch->lastItem);
+		if (offnum < minoff)
+			continue;			/* pure paranoia */
+		while (offnum <= maxoff)
+		{
+			ItemId		iid = PageGetItemId(page, offnum);
+			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
+			bool		killtuple = false;
+
+			if (BTreeTupleIsPosting(ituple))
+			{
+				int			pi = i + 1;
+				int			nposting = BTreeTupleGetNPosting(ituple);
+				int			j;
+
+				/*
+				 * We rely on the convention that heap TIDs in the scanpos
+				 * items array are stored in ascending heap TID order for a
+				 * group of TIDs that originally came from a posting list
+				 * tuple.  This convention even applies during backwards
+				 * scans, where returning the TIDs in descending order might
+				 * seem more natural.  This is about effectiveness, not
+				 * correctness.
+				 *
+				 * Note that the page may have been modified in almost any way
+				 * since we first read it (in the !droppedpin case), so it's
+				 * possible that this posting list tuple wasn't a posting list
+				 * tuple when we first encountered its heap TIDs.
+				 */
+				for (j = 0; j < nposting; j++)
+				{
+					ItemPointer item = BTreeTupleGetPostingN(ituple, j);
+
+					if (!ItemPointerEquals(item, &kitem->heapTid))
+						break;	/* out of posting list loop */
+
+					/*
+					 * kitem must have matching offnum when heap TIDs match,
+					 * though only in the common case where the page can't
+					 * have been concurrently modified
+					 */
+					Assert(kitem->indexOffset == offnum || !droppedpin);
+
+					/*
+					 * Read-ahead to later kitems here.
+					 *
+					 * We rely on the assumption that not advancing kitem here
+					 * will prevent us from considering the posting list tuple
+					 * fully dead by not matching its next heap TID in next
+					 * loop iteration.
+					 *
+					 * If, on the other hand, this is the final heap TID in
+					 * the posting list tuple, then tuple gets killed
+					 * regardless (i.e. we handle the case where the last
+					 * kitem is also the last heap TID in the last index tuple
+					 * correctly -- posting tuple still gets killed).
+					 */
+					if (pi < numKilled)
+						kitem = &batch->items[batch->killedItems[pi++]];
+				}
+
+				/*
+				 * Don't bother advancing the outermost loop's int iterator to
+				 * avoid processing killed items that relate to the same
+				 * offnum/posting list tuple.  This micro-optimization hardly
+				 * seems worth it.  (Further iterations of the outermost loop
+				 * will fail to match on this same posting list's first heap
+				 * TID instead, so we'll advance to the next offnum/index
+				 * tuple pretty quickly.)
+				 */
+				if (j == nposting)
+					killtuple = true;
+			}
+			else if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
+				killtuple = true;
+
+			/*
+			 * Mark index item as dead, if it isn't already.  Since this
+			 * happens while holding a buffer lock possibly in shared mode,
+			 * it's possible that multiple processes attempt to do this
+			 * simultaneously, leading to multiple full-page images being sent
+			 * to WAL (if wal_log_hints or data checksums are enabled), which
+			 * is undesirable.
+			 */
+			if (killtuple && !ItemIdIsDead(iid))
+			{
+				/* found the item/all posting list items */
+				ItemIdMarkDead(iid);
+				killedsomething = true;
+				break;			/* out of inner search loop */
+			}
+			offnum = OffsetNumberNext(offnum);
+		}
+	}
+
+	/*
+	 * Since this can be redone later if needed, mark as dirty hint.
+	 *
+	 * Whenever we mark anything LP_DEAD, we also set the page's
+	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.  (Note that we
+	 * only rely on the page-level flag in !heapkeyspace indexes.)
+	 */
+	if (killedsomething)
+	{
+		opaque->btpo_flags |= BTP_HAS_GARBAGE;
+		MarkBufferDirtyHint(pos->buf, true);
+	}
+
+	_bt_unlockbuf(scan->indexRelation, pos->buf);
+}
 
 /*
  * The following routines manage a shared-memory area in which we track
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index ebca02588d3..a00a1108ba5 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -1001,6 +1001,38 @@ typedef struct BTScanPosData
 
 typedef BTScanPosData *BTScanPos;
 
+/*
+ * Minimal AM-specific concept of "position" for batching.
+ */
+typedef struct BTBatchScanPosData
+{
+	Buffer		buf;			/* currPage buf (invalid means unpinned) */
+
+	/* page details as of the saved position's call to _bt_readpage */
+	BlockNumber currPage;		/* page referenced by items array */
+	BlockNumber prevPage;		/* currPage's left link */
+	BlockNumber nextPage;		/* currPage's right link */
+	XLogRecPtr	lsn;			/* currPage's LSN */
+
+	/* scan direction for the saved position's call to _bt_readpage */
+	ScanDirection dir;
+
+	/*
+	 * If we are doing an index-only scan, nextTupleOffset is the first free
+	 * location in the associated tuple storage workspace.
+	 */
+	int			nextTupleOffset;
+
+	/*
+	 * moreLeft and moreRight track whether we think there may be matching
+	 * index entries to the left and right of the current page, respectively.
+	 */
+	bool		moreLeft;
+	bool		moreRight;
+} BTBatchScanPosData;
+
+typedef BTBatchScanPosData *BTBatchScanPos;
+
 #define BTScanPosIsPinned(scanpos) \
 ( \
 	AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
@@ -1017,7 +1049,6 @@ typedef BTScanPosData *BTScanPos;
 		if (BTScanPosIsPinned(scanpos)) \
 			BTScanPosUnpin(scanpos); \
 	} while (0)
-
 #define BTScanPosIsValid(scanpos) \
 ( \
 	AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
@@ -1030,6 +1061,35 @@ typedef BTScanPosData *BTScanPos;
 		(scanpos).currPage = InvalidBlockNumber; \
 	} while (0)
 
+#define BTBatchScanPosIsPinned(scanpos) \
+( \
+	AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
+				!BufferIsValid((scanpos).buf)), \
+	BufferIsValid((scanpos).buf) \
+)
+#define BTBatchScanPosUnpin(scanpos) \
+	do { \
+		ReleaseBuffer((scanpos).buf); \
+		(scanpos).buf = InvalidBuffer; \
+	} while (0)
+#define BTBatchScanPosUnpinIfPinned(scanpos) \
+	do { \
+		if (BTBatchScanPosIsPinned(scanpos)) \
+			BTBatchScanPosUnpin(scanpos); \
+	} while (0)
+#define BTBatchScanPosIsValid(scanpos) \
+( \
+	AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
+				!BufferIsValid((scanpos).buf)), \
+	BlockNumberIsValid((scanpos).currPage) \
+)
+#define BTBatchScanPosInvalidate(scanpos) \
+	do { \
+		(scanpos).buf = InvalidBuffer; \
+		(scanpos).currPage = InvalidBlockNumber; \
+	} while (0)
+
+
 /* We need one of these for each equality-type SK_SEARCHARRAY scan key */
 typedef struct BTArrayKeyInfo
 {
@@ -1191,6 +1251,8 @@ extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
 extern Size btestimateparallelscan(Relation rel, int nkeys, int norderbys);
 extern void btinitparallelscan(void *target);
 extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
+extern IndexScanBatch btgetbatch(IndexScanDesc scan, ScanDirection dir);
+extern void btfreebatch(IndexScanDesc scan, IndexScanBatch batch);
 extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
 extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 					 ScanKey orderbys, int norderbys);
@@ -1215,6 +1277,9 @@ extern StrategyNumber bttranslatecmptype(CompareType cmptype, Oid opfamily);
  */
 extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *next_scan_page,
 							   BlockNumber *last_curr_page, bool first);
+extern bool _bt_parallel_seize_batch(IndexScanDesc scan, BTBatchScanPos pos,
+									 BlockNumber *next_scan_page,
+									 BlockNumber *last_curr_page, bool first);
 extern void _bt_parallel_release(IndexScanDesc scan,
 								 BlockNumber next_scan_page,
 								 BlockNumber curr_page);
@@ -1308,6 +1373,10 @@ extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
 extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
 
+extern IndexScanBatch _bt_first_batch(IndexScanDesc scan, ScanDirection dir);
+extern IndexScanBatch _bt_next_batch(IndexScanDesc scan, BTBatchScanPos pos, ScanDirection dir);
+extern void _bt_kill_batch(IndexScanDesc scan, IndexScanBatch batch);
+
 /*
  * prototypes for functions in nbtutils.c
  */
@@ -1326,6 +1395,7 @@ extern bool _bt_scanbehind_checkkeys(IndexScanDesc scan, ScanDirection dir,
 									 IndexTuple finaltup);
 extern void _bt_set_startikey(IndexScanDesc scan, BTReadPageState *pstate);
 extern void _bt_killitems(IndexScanDesc scan);
+extern void _bt_killitems_batch(IndexScanDesc scan, IndexScanBatch batch);
 extern BTCycleId _bt_vacuum_cycleid(Relation rel);
 extern BTCycleId _bt_start_vacuum(Relation rel);
 extern void _bt_end_vacuum(Relation rel);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 060d964e399..1e5548aacb9 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -195,6 +195,8 @@ BOOL
 BOOLEAN
 BOX
 BTArrayKeyInfo
+BTBatchInfo
+BTBatchScanPosData
 BTBuildState
 BTCallbackState
 BTCycleId
-- 
2.49.0

