From ec809e08fe59ffc3eaee772d2269cb47f365c0a6 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 7 Feb 2024 12:47:51 -0500
Subject: [PATCH v4 2/2] Index scans prefetch with streaming read API

---
 src/backend/access/heap/heapam_handler.c |  55 +++++++-
 src/backend/access/index/indexam.c       | 104 ++++++++++++++-
 src/backend/executor/execMain.c          |   4 +
 src/backend/executor/nodeIndexonlyscan.c | 153 ++++++++++++++++-------
 src/backend/executor/nodeIndexscan.c     |  61 ++++++++-
 src/backend/optimizer/plan/createplan.c  |   1 +
 src/backend/optimizer/plan/planner.c     |   3 +
 src/backend/storage/aio/streaming_read.c |  11 +-
 src/include/access/genam.h               |  41 ++++++
 src/include/access/relscan.h             |  11 ++
 src/include/nodes/execnodes.h            |   1 +
 src/include/nodes/pathnodes.h            |   1 +
 src/include/nodes/plannodes.h            |   1 +
 src/include/storage/streaming_read.h     |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 15 files changed, 401 insertions(+), 50 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d15a02b2be..03e6b522a8 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -108,6 +108,12 @@ heapam_index_fetch_end(IndexFetchTableData *scan)
 	pfree(hscan);
 }
 
+/*
+ * For those using the streaming read user, tid is an output parameter set with
+ * the latest TID obtained from the streaming read API. For non-streaming read
+ * users, tid is an input parameter and contains the next block to be read from
+ * the heap.
+ */
 static bool
 heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 						 ItemPointer tid,
@@ -127,9 +133,52 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
 		/* Switch to correct buffer if we don't have it already */
 		Buffer		prev_buf = hscan->xs_cbuf;
 
-		hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
-											  hscan->xs_base.rel,
-											  ItemPointerGetBlockNumber(tid));
+		if (scan->pgsr)
+		{
+			TIDQueueItem *result;
+
+			if (BufferIsValid(hscan->xs_cbuf))
+				ReleaseBuffer(hscan->xs_cbuf);
+
+			hscan->xs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, (void **) &result);
+			if (!BufferIsValid(hscan->xs_cbuf))
+			{
+				/*
+				 * Invalidate the item pointer to allow the caller to
+				 * distinguish between index_fetch_heap() returning false
+				 * because the tuple is not visible and because the streaming
+				 * read callback ran out of queue items.
+				 */
+				ItemPointerSetInvalid(tid);
+				return false;
+			}
+
+			/* Set this for use below */
+			*tid = result->tid;
+
+			scan->tid = result->tid;
+			scan->recheck = result->recheck;
+
+			if (scan->itup)
+				pfree(scan->itup);
+			scan->itup = NULL;
+
+			if (scan->htup)
+				pfree(scan->htup);
+			scan->htup = NULL;
+
+			if (result->itup)
+				scan->itup = CopyIndexTuple(result->itup);
+			if (result->htup)
+				scan->htup = heap_copytuple(result->htup);
+		}
+		else
+		{
+			hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
+												  hscan->xs_base.rel,
+												  ItemPointerGetBlockNumber(tid));
+		}
+
 
 		/*
 		 * Prune page, but only if we weren't already on this page
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index bbd499abcf..0bf50bcd83 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -248,6 +248,91 @@ index_insert_cleanup(Relation indexRelation,
 		indexRelation->rd_indam->aminsertcleanup(indexInfo);
 }
 
+void
+tid_queue_reset(TIDQueue *q)
+{
+	q->head = q->tail = 0;
+}
+
+TIDQueue *
+tid_queue_alloc(int size)
+{
+	TIDQueue   *result;
+
+	result = palloc(sizeof(TIDQueue) + (sizeof(TIDQueueItem) * size));
+	result->size = size;
+	tid_queue_reset(result);
+	return result;
+}
+
+static TIDQueueItem
+index_tid_dequeue(TIDQueue *tid_queue)
+{
+	TIDQueueItem result;
+
+	Assert(tid_queue->tail > tid_queue->head);
+	result = tid_queue->data[tid_queue->head % tid_queue->size];
+	tid_queue->head++;
+
+	return result;
+}
+
+void
+index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck,
+				  HeapTuple htup, IndexTuple itup)
+{
+	TIDQueueItem *cur;
+
+	Assert(tid_queue->tail >= tid_queue->head);
+	Assert(!TID_QUEUE_FULL(tid_queue));
+	cur = &tid_queue->data[tid_queue->tail % tid_queue->size];
+	ItemPointerSet(&cur->tid,
+				   ItemPointerGetBlockNumber(tid), ItemPointerGetOffsetNumber(tid));
+	cur->recheck = recheck;
+	cur->itup = NULL;
+	cur->htup = NULL;
+
+	if (itup)
+		cur->itup = CopyIndexTuple(itup);
+
+	if (htup)
+		cur->htup = heap_copytuple(htup);
+
+	tid_queue->tail++;
+}
+
+static BlockNumber
+index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void *per_buffer_data)
+{
+	IndexScanDesc scan = pgsr_private;
+	TIDQueueItem *result = per_buffer_data;
+
+	scan->kill_prior_tuple = false;
+	scan->xs_heap_continue = false;
+
+	if (TID_QUEUE_EMPTY(scan->tid_queue))
+		return InvalidBlockNumber;
+
+	*result = index_tid_dequeue(scan->tid_queue);
+	return ItemPointerGetBlockNumber(&result->tid);
+}
+
+void
+index_pgsr_alloc(IndexScanDesc scan)
+{
+	if (scan->xs_heapfetch->pgsr)
+		pg_streaming_read_free(scan->xs_heapfetch->pgsr);
+	scan->xs_heapfetch->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+															  scan,
+															  sizeof(TIDQueueItem),
+															  NULL,
+															  BMR_REL(scan->heapRelation),
+															  MAIN_FORKNUM,
+															  index_pgsr_next_single);
+
+	pg_streaming_read_set_resumable(scan->xs_heapfetch->pgsr);
+}
+
 /*
  * index_beginscan - start a scan of an index with amgettuple
  *
@@ -333,6 +418,8 @@ index_beginscan_internal(Relation indexRelation,
 	/* Initialize information for parallel scan. */
 	scan->parallel_scan = pscan;
 	scan->xs_temp_snap = temp_snap;
+	scan->index_done = false;
+	scan->tid_queue = NULL;
 
 	return scan;
 }
@@ -364,8 +451,12 @@ index_rescan(IndexScanDesc scan,
 	if (scan->xs_heapfetch)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
+	scan->index_done = false;
+
 	scan->kill_prior_tuple = false; /* for safety */
 	scan->xs_heap_continue = false;
+	if (scan->tid_queue)
+		tid_queue_reset(scan->tid_queue);
 
 	scan->indexRelation->rd_indam->amrescan(scan, keys, nkeys,
 											orderbys, norderbys);
@@ -384,10 +475,17 @@ index_endscan(IndexScanDesc scan)
 	/* Release resources (like buffer pins) from table accesses */
 	if (scan->xs_heapfetch)
 	{
+		if (scan->xs_heapfetch->pgsr)
+			pg_streaming_read_free(scan->xs_heapfetch->pgsr);
+		scan->xs_heapfetch->pgsr = NULL;
 		table_index_fetch_end(scan->xs_heapfetch);
 		scan->xs_heapfetch = NULL;
 	}
 
+	if (scan->tid_queue)
+		pfree(scan->tid_queue);
+	scan->tid_queue = NULL;
+
 	/* End the AM's scan */
 	scan->indexRelation->rd_indam->amendscan(scan);
 
@@ -530,6 +628,9 @@ index_parallelrescan(IndexScanDesc scan)
 	if (scan->xs_heapfetch)
 		table_index_fetch_reset(scan->xs_heapfetch);
 
+	if (scan->tid_queue)
+		tid_queue_reset(scan->tid_queue);
+
 	/* amparallelrescan is optional; assume no-op if not provided by AM */
 	if (scan->indexRelation->rd_indam->amparallelrescan != NULL)
 		scan->indexRelation->rd_indam->amparallelrescan(scan);
@@ -603,6 +704,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction)
 		if (scan->xs_heapfetch)
 			table_index_fetch_reset(scan->xs_heapfetch);
 
+		scan->index_done = true;
 		return NULL;
 	}
 	Assert(ItemPointerIsValid(&scan->xs_heaptid));
@@ -651,7 +753,7 @@ index_fetch_heap(IndexScanDesc scan, TupleTableSlot *slot)
 	 * recovery because it may violate MVCC to do so.  See comments in
 	 * RelationGetIndexScan().
 	 */
-	if (!scan->xactStartedInRecovery)
+	if (!scan->xactStartedInRecovery && !scan->xs_heapfetch->pgsr)
 		scan->kill_prior_tuple = all_dead;
 
 	return found;
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 13a9b7da83..9e951a69ab 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1652,6 +1652,10 @@ ExecutePlan(EState *estate,
 	if (!execute_once)
 		use_parallel_mode = false;
 
+	estate->es_use_prefetching = execute_once;
+	if (!planstate->plan->allow_prefetch)
+		estate->es_use_prefetching = false;
+
 	estate->es_use_parallel_mode = use_parallel_mode;
 	if (use_parallel_mode)
 		EnterParallelMode();
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 2c2c9c10b5..e3824c25e0 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -113,61 +113,109 @@ IndexOnlyNext(IndexOnlyScanState *node)
 						 node->ioss_NumOrderByKeys);
 	}
 
+	if (!scandesc->tid_queue)
+	{
+		/* Fall back to a queue size of 1 for now */
+		int			queue_size = 1;
+
+		if (estate->es_use_prefetching && ScanDirectionIsForward(direction))
+			queue_size = TID_QUEUE_SIZE;
+		scandesc->tid_queue = tid_queue_alloc(queue_size);
+		index_pgsr_alloc(scandesc);
+	}
+
 	/*
 	 * OK, now that we have what we need, fetch the next tuple.
 	 */
-	while ((tid = index_getnext_tid(scandesc, direction)) != NULL)
+	for (;;)
 	{
-		bool		tuple_from_heap = false;
+		bool		tuple_from_heap = true;
 
 		CHECK_FOR_INTERRUPTS();
 
-		/*
-		 * We can skip the heap fetch if the TID references a heap page on
-		 * which all tuples are known visible to everybody.  In any case,
-		 * we'll use the index tuple not the heap tuple as the data source.
-		 *
-		 * Note on Memory Ordering Effects: visibilitymap_get_status does not
-		 * lock the visibility map buffer, and therefore the result we read
-		 * here could be slightly stale.  However, it can't be stale enough to
-		 * matter.
-		 *
-		 * We need to detect clearing a VM bit due to an insert right away,
-		 * because the tuple is present in the index page but not visible. The
-		 * reading of the TID by this scan (using a shared lock on the index
-		 * buffer) is serialized with the insert of the TID into the index
-		 * (using an exclusive lock on the index buffer). Because the VM bit
-		 * is cleared before updating the index, and locking/unlocking of the
-		 * index page acts as a full memory barrier, we are sure to see the
-		 * cleared bit if we see a recently-inserted TID.
-		 *
-		 * Deletes do not update the index page (only VACUUM will clear out
-		 * the TID), so the clearing of the VM bit by a delete is not
-		 * serialized with this test below, and we may see a value that is
-		 * significantly stale. However, we don't care about the delete right
-		 * away, because the tuple is still visible until the deleting
-		 * transaction commits or the statement ends (if it's our
-		 * transaction). In either case, the lock on the VM buffer will have
-		 * been released (acting as a write barrier) after clearing the bit.
-		 * And for us to have a snapshot that includes the deleting
-		 * transaction (making the tuple invisible), we must have acquired
-		 * ProcArrayLock after that time, acting as a read barrier.
-		 *
-		 * It's worth going through this complexity to avoid needing to lock
-		 * the VM buffer, which could cause significant contention.
-		 */
-		if (!VM_ALL_VISIBLE(scandesc->heapRelation,
-							ItemPointerGetBlockNumber(tid),
-							&node->ioss_VMBuffer))
+		if (!scandesc->index_done)
+		{
+			while (!TID_QUEUE_FULL(scandesc->tid_queue))
+			{
+				if ((tid = index_getnext_tid(scandesc, direction)) == NULL)
+				{
+					scandesc->index_done = true;
+					break;
+				}
+
+				/*
+				 * We can skip the heap fetch if the TID references a heap
+				 * page on which all tuples are known visible to everybody. In
+				 * any case, we'll use the index tuple not the heap tuple as
+				 * the data source.
+				 *
+				 * Note on Memory Ordering Effects: visibilitymap_get_status
+				 * does not lock the visibility map buffer, and therefore the
+				 * result we read here could be slightly stale.  However, it
+				 * can't be stale enough to matter.
+				 *
+				 * We need to detect clearing a VM bit due to an insert right
+				 * away, because the tuple is present in the index page but
+				 * not visible. The reading of the TID by this scan (using a
+				 * shared lock on the index buffer) is serialized with the
+				 * insert of the TID into the index (using an exclusive lock
+				 * on the index buffer). Because the VM bit is cleared before
+				 * updating the index, and locking/unlocking of the index page
+				 * acts as a full memory barrier, we are sure to see the
+				 * cleared bit if we see a recently-inserted TID.
+				 *
+				 * Deletes do not update the index page (only VACUUM will
+				 * clear out the TID), so the clearing of the VM bit by a
+				 * delete is not serialized with this test below, and we may
+				 * see a value that is significantly stale. However, we don't
+				 * care about the delete right away, because the tuple is
+				 * still visible until the deleting transaction commits or the
+				 * statement ends (if it's our transaction). In either case,
+				 * the lock on the VM buffer will have been released (acting
+				 * as a write barrier) after clearing the bit. And for us to
+				 * have a snapshot that includes the deleting transaction
+				 * (making the tuple invisible), we must have acquired
+				 * ProcArrayLock after that time, acting as a read barrier.
+				 *
+				 * It's worth going through this complexity to avoid needing
+				 * to lock the VM buffer, which could cause significant
+				 * contention.
+				 */
+
+				if (VM_ALL_VISIBLE(scandesc->heapRelation,
+								   ItemPointerGetBlockNumber(tid),
+								   &node->ioss_VMBuffer))
+				{
+					tuple_from_heap = false;
+					break;
+				}
+
+				index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck,
+								  scandesc->xs_hitup, scandesc->xs_itup);
+			}
+		}
+
+		if (tuple_from_heap)
 		{
 			/*
 			 * Rats, we have to visit the heap to check visibility.
 			 */
 			InstrCountTuples2(node, 1);
+
 			if (!index_fetch_heap(scandesc, node->ioss_TableSlot))
-				continue;		/* no visible tuple, try next index entry */
+			{
+				/*
+				 * Either there is no visible tuple or the streaming read ran
+				 * out of queue items and it is time to add more.
+				 */
+				if (ItemPointerIsValid(&scandesc->xs_heaptid))
+					continue;
 
-			ExecClearTuple(node->ioss_TableSlot);
+				if (!scandesc->index_done)
+					continue;
+
+				break;
+			}
 
 			/*
 			 * Only MVCC snapshots are supported here, so there should be no
@@ -185,15 +233,32 @@ IndexOnlyNext(IndexOnlyScanState *node)
 			 * entry might require a visit to the same heap page.
 			 */
 
-			tuple_from_heap = true;
+			/*
+			 * If we visit the underlying table, we need to reset the
+			 * IndexScanDesc's fields to match the per tuple state returned by
+			 * the streaming read API. The most recent index tuple fetched
+			 * will not necessarily match the current TID being processed
+			 * after returning from index_fetch_heap().
+			 */
+			scandesc->xs_recheck = scandesc->xs_heapfetch->recheck;
+			scandesc->xs_heaptid = scandesc->xs_heapfetch->tid;
+
+			scandesc->xs_hitup = scandesc->xs_heapfetch->htup;
+			scandesc->xs_itup = scandesc->xs_heapfetch->itup;
 		}
 
 		/*
 		 * Fill the scan tuple slot with data from the index.  This might be
-		 * provided in either HeapTuple or IndexTuple format.  Conceivably an
+		 * provided in either HeapTuple or IndexTuple format. Conceivably an
 		 * index AM might fill both fields, in which case we prefer the heap
-		 * format, since it's probably a bit cheaper to fill a slot from.
+		 * format, since it's probably a bit cheaper to fill a slot from. As
+		 * soon as we encounter a tuple from an all visible block, we stop
+		 * prefetching and yield the tuple. As such, we can use the IndexTuple
+		 * and HeapTuple that the index AM filled in the scan descriptor
+		 * instead of having to get them from the per tuple state yielded by
+		 * the streaming read API.
 		 */
+		ExecClearTuple(node->ioss_TableSlot);
 		if (scandesc->xs_hitup)
 		{
 			/*
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 03142b4a94..aa72479df1 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -125,13 +125,72 @@ IndexNext(IndexScanState *node)
 						 node->iss_OrderByKeys, node->iss_NumOrderByKeys);
 	}
 
+	if (!scandesc->tid_queue)
+	{
+		/* Fall back to a queue size of 1 for now */
+		int			queue_size = 1;
+
+		if (estate->es_use_prefetching && ScanDirectionIsForward(direction))
+			queue_size = TID_QUEUE_SIZE;
+		scandesc->tid_queue = tid_queue_alloc(queue_size);
+		index_pgsr_alloc(scandesc);
+	}
+
 	/*
 	 * ok, now that we have what we need, fetch the next tuple.
 	 */
-	while (index_getnext_slot(scandesc, direction, slot))
+	for (;;)
 	{
+		ItemPointerData last_tid = scandesc->xs_heaptid;
+
+		/*
+		 * If we haven't exhausted TIDs from the index, then fill the queue
+		 * with TIDs from the index until the queue is full. Mark the index as
+		 * exhausted if we reach the end of it.
+		 */
+		if (!scandesc->index_done)
+		{
+			while (!TID_QUEUE_FULL(scandesc->tid_queue))
+			{
+				ItemPointer tid;
+
+				if ((tid = index_getnext_tid(scandesc, direction)) == NULL)
+				{
+					scandesc->index_done = true;
+					break;
+				}
+
+				index_tid_enqueue(scandesc->tid_queue, tid, scandesc->xs_recheck,
+								  NULL, NULL);
+			}
+		}
+
+		if (scandesc->xs_heap_continue)
+			scandesc->xs_heaptid = last_tid;
+
+		/*
+		 * index_fetch_heap() returns false when either the tuple isn't
+		 * visible or when there's no more to read
+		 */
+		if (!index_fetch_heap(scandesc, slot))
+		{
+			if (ItemPointerIsValid(&scandesc->xs_heaptid))
+				continue;
+
+			if (!scandesc->index_done)
+				continue;
+
+			if (scandesc->xs_heap_continue)
+				continue;
+
+			break;
+		}
+
 		CHECK_FOR_INTERRUPTS();
 
+		scandesc->xs_recheck = scandesc->xs_heapfetch->recheck;
+		scandesc->xs_heaptid = scandesc->xs_heapfetch->tid;
+
 		/*
 		 * If the index was lossy, we have to recheck the index quals using
 		 * the fetched tuple.
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 610f4a56d6..3360da8288 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -4719,6 +4719,7 @@ create_mergejoin_plan(PlannerInfo *root,
 
 	/* Costs of sort and material steps are included in path cost already */
 	copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
+	root->allow_prefetch = false;
 
 	return join_plan;
 }
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2e2458b128..3be133f757 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -413,11 +413,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	root = subquery_planner(glob, parse, NULL,
 							false, tuple_fraction);
 
+	root->allow_prefetch = true;
+
 	/* Select best Path and turn it into a Plan */
 	final_rel = fetch_upper_rel(root, UPPERREL_FINAL, NULL);
 	best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);
 
 	top_plan = create_plan(root, best_path);
+	top_plan->allow_prefetch = root->allow_prefetch;
 
 	/*
 	 * If creating a plan for a scrollable cursor, make sure it can run
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
index 19605090fe..aaccf25a7f 100644
--- a/src/backend/storage/aio/streaming_read.c
+++ b/src/backend/storage/aio/streaming_read.c
@@ -34,6 +34,7 @@ struct PgStreamingRead
 	int			pinned_buffers_trigger;
 	int			next_tail_buffer;
 	bool		finished;
+	bool		resumable;
 	void	   *pgsr_private;
 	PgStreamingReadBufferCB callback;
 	BufferAccessStrategy strategy;
@@ -245,10 +246,12 @@ pg_streaming_read_new_range(PgStreamingRead *pgsr)
 static void
 pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
 {
+	bool		done = pgsr->finished && !pgsr->resumable;
+
 	/*
 	 * If we're finished or can't start more I/O, then don't look ahead.
 	 */
-	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+	if (done || pgsr->ios_in_progress == pgsr->max_ios)
 		return;
 
 	/*
@@ -433,3 +436,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr)
 		pfree(pgsr->per_buffer_data);
 	pfree(pgsr);
 }
+
+void
+pg_streaming_read_set_resumable(PgStreamingRead *pgsr)
+{
+	pgsr->resumable = true;
+}
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 8026c2b36d..13d6fab318 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -14,6 +14,7 @@
 #ifndef GENAM_H
 #define GENAM_H
 
+#include "access/relscan.h"
 #include "access/sdir.h"
 #include "access/skey.h"
 #include "nodes/tidbitmap.h"
@@ -24,6 +25,22 @@
 /* We don't want this file to depend on execnodes.h. */
 struct IndexInfo;
 
+typedef struct TIDQueueItem
+{
+	ItemPointerData tid;
+	bool		recheck;
+	IndexTuple	itup;
+	HeapTuple	htup;
+} TIDQueueItem;
+
+typedef struct TIDQueue
+{
+	uint64		head;
+	uint64		tail;
+	int			size;
+	TIDQueueItem data[FLEXIBLE_ARRAY_MEMBER /* size */ ];
+} TIDQueue;
+
 /*
  * Struct for statistics returned by ambuild
  */
@@ -175,6 +192,30 @@ extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
 											  ParallelIndexScanDesc pscan);
 extern ItemPointer index_getnext_tid(IndexScanDesc scan,
 									 ScanDirection direction);
+
+extern void index_pgsr_alloc(IndexScanDesc scan);
+
+extern void tid_queue_reset(TIDQueue *q);
+
+extern void index_tid_enqueue(TIDQueue *tid_queue, ItemPointer tid, bool recheck,
+							  HeapTuple htup, IndexTuple itup);
+
+#define TID_QUEUE_SIZE 6
+
+extern TIDQueue *tid_queue_alloc(int size);
+
+static inline bool
+TID_QUEUE_FULL(TIDQueue *q)
+{
+	return q->tail - q->head == q->size;
+}
+
+static inline bool
+TID_QUEUE_EMPTY(TIDQueue *q)
+{
+	return q->head == q->tail;
+}
+
 struct TupleTableSlot;
 extern bool index_fetch_heap(IndexScanDesc scan, struct TupleTableSlot *slot);
 extern bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction,
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 521043304a..66b3d90c83 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,6 +18,7 @@
 #include "access/itup.h"
 #include "port/atomics.h"
 #include "storage/buf.h"
+#include "storage/streaming_read.h"
 #include "storage/spin.h"
 #include "utils/relcache.h"
 
@@ -104,8 +105,16 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
 typedef struct IndexFetchTableData
 {
 	Relation	rel;
+	PgStreamingRead *pgsr;
+
+	ItemPointerData tid;
+	bool		recheck;
+	IndexTuple	itup;
+	HeapTuple	htup;
 } IndexFetchTableData;
 
+typedef struct TIDQueue TIDQueue;
+
 /*
  * We use the same IndexScanDescData structure for both amgettuple-based
  * and amgetbitmap-based index scans.  Some fields are only relevant in
@@ -148,6 +157,8 @@ typedef struct IndexScanDescData
 	bool		xs_heap_continue;	/* T if must keep walking, potential
 									 * further results */
 	IndexFetchTableData *xs_heapfetch;
+	TIDQueue   *tid_queue;
+	bool		index_done;
 
 	bool		xs_recheck;		/* T means scan keys must be rechecked */
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 444a5f0fd5..a8042abf95 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -697,6 +697,7 @@ typedef struct EState
 	struct EPQState *es_epq_active;
 
 	bool		es_use_parallel_mode;	/* can we use parallel workers? */
+	bool		es_use_prefetching;
 
 	/* The per-query shared memory area to use for parallel execution. */
 	struct dsa_area *es_query_dsa;
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 534692bee1..171066b14e 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -203,6 +203,7 @@ struct PlannerInfo
 
 	/* 1 at the outermost Query */
 	Index		query_level;
+	bool		allow_prefetch;
 
 	/* NULL at outermost Query */
 	PlannerInfo *parent_root pg_node_attr(read_write_ignore);
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index b4ef6bc44c..317de2d781 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -169,6 +169,7 @@ typedef struct Plan
 	 */
 	Bitmapset  *extParam;
 	Bitmapset  *allParam;
+	bool		allow_prefetch;
 } Plan;
 
 /* ----------------
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
index 40c3408c54..2288b7b5eb 100644
--- a/src/include/storage/streaming_read.h
+++ b/src/include/storage/streaming_read.h
@@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
 extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
 extern void pg_streaming_read_free(PgStreamingRead *pgsr);
 
+extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr);
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 0e34145187..ecc910ff35 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2726,6 +2726,8 @@ TBMSharedIteratorState
 TBMStatus
 TBlockState
 TIDBitmap
+TIDQueue
+TIDQueueItem
 TM_FailureData
 TM_IndexDelete
 TM_IndexDeleteOp
-- 
2.37.2

