From a63d3b74d04e6904b12d998a1ef3b1887f6b2d64 Mon Sep 17 00:00:00 2001
From: Mikhail Nikalayeu <mihailnikalayeu@gmail.com>
Date: Wed, 1 Jan 2025 15:25:20 +0100
Subject: [PATCH v25 04/12] Support snapshot resets in parallel concurrent
 index builds

Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well.

The main limitation of applying that technic to parallel builds was a requirement to wait until workers processes restore their initial snapshot from leader.

To address this, following changes applied:
- add infrastructure to track snapshot restoration in parallel workers
- extend parallel scan initialization to support periodic snapshot resets
- wait for parallel workers to restore their initial snapshots before proceeding with scan
- relax limitation for parallel worker to call GetLatestSnapshot
---
 src/backend/access/brin/brin.c                | 50 +++++++++-------
 src/backend/access/gin/gininsert.c            | 50 +++++++++-------
 src/backend/access/heap/heapam_handler.c      | 12 ++--
 src/backend/access/nbtree/nbtsort.c           | 57 ++++++++++++++-----
 src/backend/access/table/tableam.c            | 37 ++++++++++--
 src/backend/access/transam/parallel.c         | 50 ++++++++++++++--
 src/backend/catalog/index.c                   |  2 +-
 src/backend/executor/nodeSeqscan.c            |  3 +-
 src/backend/utils/time/snapmgr.c              |  8 ---
 src/include/access/parallel.h                 |  3 +-
 src/include/access/relscan.h                  |  1 +
 src/include/access/tableam.h                  |  9 +--
 .../expected/cic_reset_snapshots.out          | 25 +++++++-
 .../sql/cic_reset_snapshots.sql               |  7 ++-
 14 files changed, 225 insertions(+), 89 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 186edd0d229..5554cfa6f4d 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -143,7 +143,6 @@ typedef struct BrinLeader
 	 */
 	BrinShared *brinshared;
 	Sharedsort *sharedsort;
-	Snapshot	snapshot;
 	WalUsage   *walusage;
 	BufferUsage *bufferusage;
 } BrinLeader;
@@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
 static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 								 bool isconcurrent, int request);
 static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
-static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static Size _brin_parallel_estimate_shared(Relation heap);
 static double _brin_parallel_heapscan(BrinBuildState *state);
 static double _brin_parallel_merge(BrinBuildState *state);
 static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
@@ -1221,7 +1220,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		reltuples = _brin_parallel_merge(state);
 
 		_brin_end_parallel(state->bs_leader, state);
-		Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 	}
 	else						/* no parallel index build */
 	{
@@ -1254,7 +1252,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		brin_fill_empty_ranges(state,
 							   state->bs_currRangeStart,
 							   state->bs_maxRangeStart);
-		Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 	}
 
 	/* release resources */
@@ -1269,6 +1266,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 
 	result->heap_tuples = reltuples;
 	result->index_tuples = idxtuples;
+	Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
 
 	return result;
 }
@@ -2368,7 +2366,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 {
 	ParallelContext *pcxt;
 	int			scantuplesortstates;
-	Snapshot	snapshot;
 	Size		estbrinshared;
 	Size		estsort;
 	BrinShared *brinshared;
@@ -2399,25 +2396,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	 * Prepare for scan of the base relation.  In a normal index build, we use
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
-	 * concurrent build, we take a regular MVCC snapshot and index whatever's
-	 * live according to that.
+	 * concurrent build, we take a regular MVCC snapshot and push it as active.
+	 * Later we index whatever's live according to that snapshot while that
+	 * snapshot is reset periodically.
 	 */
 	if (!isconcurrent)
 	{
 		Assert(ActiveSnapshotSet());
-		snapshot = SnapshotAny;
 		need_pop_active_snapshot = false;
 	}
 	else
 	{
-		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+		Assert(!ActiveSnapshotSet());
 		PushActiveSnapshot(GetTransactionSnapshot());
 	}
 
 	/*
 	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
 	 */
-	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	estbrinshared = _brin_parallel_estimate_shared(heap);
 	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
 	estsort = tuplesort_estimate_shared(scantuplesortstates);
 	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -2457,8 +2454,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	{
 		if (need_pop_active_snapshot)
 			PopActiveSnapshot();
-		if (IsMVCCSnapshot(snapshot))
-			UnregisterSnapshot(snapshot);
 		DestroyParallelContext(pcxt);
 		ExitParallelMode();
 		return;
@@ -2483,7 +2478,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromBrinShared(brinshared),
-								  snapshot);
+								  isconcurrent ? InvalidSnapshot : SnapshotAny,
+								  isconcurrent);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2529,7 +2525,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 		brinleader->nparticipanttuplesorts++;
 	brinleader->brinshared = brinshared;
 	brinleader->sharedsort = sharedsort;
-	brinleader->snapshot = snapshot;
 	brinleader->walusage = walusage;
 	brinleader->bufferusage = bufferusage;
 
@@ -2545,6 +2540,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	/* Save leader state now that it's clear build will be parallel */
 	buildstate->bs_leader = brinleader;
 
+	/*
+	 * In case of concurrent build snapshots are going to be reset periodically.
+	 * We need to wait until all workers imported initial snapshot.
+	 */
+	if (isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, true);
+
 	/* Join heap scan ourselves */
 	if (leaderparticipates)
 		_brin_leader_participate_as_worker(buildstate, heap, index);
@@ -2553,7 +2555,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	 * Caller needs to wait for all launched workers when we return.  Make
 	 * sure that the failure-to-start case will not hang forever.
 	 */
-	WaitForParallelWorkersToAttach(pcxt);
+	if (!isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, false);
 	if (need_pop_active_snapshot)
 		PopActiveSnapshot();
 }
@@ -2576,9 +2579,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
 		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
 
-	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(brinleader->snapshot))
-		UnregisterSnapshot(brinleader->snapshot);
 	DestroyParallelContext(brinleader->pcxt);
 	ExitParallelMode();
 }
@@ -2778,14 +2778,14 @@ _brin_parallel_merge(BrinBuildState *state)
 
 /*
  * Returns size of shared memory required to store state for a parallel
- * brin index build based on the snapshot its parallel scan will use.
+ * brin index build.
  */
 static Size
-_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+_brin_parallel_estimate_shared(Relation heap)
 {
 	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
 	return add_size(BUFFERALIGN(sizeof(BrinShared)),
-					table_parallelscan_estimate(heap, snapshot));
+					table_parallelscan_estimate(heap, InvalidSnapshot));
 }
 
 /*
@@ -2807,6 +2807,7 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
 	/* Perform work common to all participants */
 	_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
 								  brinleader->sharedsort, heap, index, sortmem, true);
+	Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xid));
 }
 
 /*
@@ -2947,6 +2948,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 
 	_brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
 								  heapRel, indexRel, sortmem, false);
+	if (brinshared->isconcurrent)
+	{
+		PopActiveSnapshot();
+		InvalidateCatalogSnapshot();
+		Assert(!TransactionIdIsValid(MyProc->xid));
+		PushActiveSnapshot(GetTransactionSnapshot());
+	}
 
 	/* Report WAL/buffer usage during parallel execution */
 	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c
index 2f947d36619..bf26106aa5e 100644
--- a/src/backend/access/gin/gininsert.c
+++ b/src/backend/access/gin/gininsert.c
@@ -132,7 +132,6 @@ typedef struct GinLeader
 	 */
 	GinBuildShared *ginshared;
 	Sharedsort *sharedsort;
-	Snapshot	snapshot;
 	WalUsage   *walusage;
 	BufferUsage *bufferusage;
 } GinLeader;
@@ -180,7 +179,7 @@ typedef struct
 static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 								bool isconcurrent, int request);
 static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state);
-static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static Size _gin_parallel_estimate_shared(Relation heap);
 static double _gin_parallel_heapscan(GinBuildState *state);
 static double _gin_parallel_merge(GinBuildState *state);
 static void _gin_leader_participate_as_worker(GinBuildState *buildstate,
@@ -717,7 +716,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		reltuples = _gin_parallel_merge(state);
 
 		_gin_end_parallel(state->bs_leader, state);
-		Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 	}
 	else						/* no parallel index build */
 	{
@@ -741,7 +739,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 						   list, nlist, &buildstate.buildStats);
 		}
 		MemoryContextSwitchTo(oldCtx);
-		Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 	}
 
 	MemoryContextDelete(buildstate.funcCtx);
@@ -771,6 +768,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 
 	result->heap_tuples = reltuples;
 	result->index_tuples = buildstate.indtuples;
+	Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 
 	return result;
 }
@@ -905,7 +903,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 {
 	ParallelContext *pcxt;
 	int			scantuplesortstates;
-	Snapshot	snapshot;
 	Size		estginshared;
 	Size		estsort;
 	GinBuildShared *ginshared;
@@ -935,25 +932,25 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	 * Prepare for scan of the base relation.  In a normal index build, we use
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
-	 * concurrent build, we take a regular MVCC snapshot and index whatever's
-	 * live according to that.
+	 * concurrent build, we take a regular MVCC snapshot and push it as active.
+	 * Later we index whatever's live according to that snapshot while that
+	 * snapshot is reset periodically.
 	 */
 	if (!isconcurrent)
 	{
 		Assert(ActiveSnapshotSet());
-		snapshot = SnapshotAny;
 		need_pop_active_snapshot = false;
 	}
 	else
 	{
-		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+		Assert(!ActiveSnapshotSet());
 		PushActiveSnapshot(GetTransactionSnapshot());
 	}
 
 	/*
 	 * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
 	 */
-	estginshared = _gin_parallel_estimate_shared(heap, snapshot);
+	estginshared = _gin_parallel_estimate_shared(heap);
 	shm_toc_estimate_chunk(&pcxt->estimator, estginshared);
 	estsort = tuplesort_estimate_shared(scantuplesortstates);
 	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -993,8 +990,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	{
 		if (need_pop_active_snapshot)
 			PopActiveSnapshot();
-		if (IsMVCCSnapshot(snapshot))
-			UnregisterSnapshot(snapshot);
 		DestroyParallelContext(pcxt);
 		ExitParallelMode();
 		return;
@@ -1018,7 +1013,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromGinBuildShared(ginshared),
-								  snapshot);
+								  isconcurrent ? InvalidSnapshot : SnapshotAny,
+								  isconcurrent);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -1060,7 +1056,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 		ginleader->nparticipanttuplesorts++;
 	ginleader->ginshared = ginshared;
 	ginleader->sharedsort = sharedsort;
-	ginleader->snapshot = snapshot;
 	ginleader->walusage = walusage;
 	ginleader->bufferusage = bufferusage;
 
@@ -1076,6 +1071,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	/* Save leader state now that it's clear build will be parallel */
 	buildstate->bs_leader = ginleader;
 
+	/*
+	 * In case of concurrent build snapshots are going to be reset periodically.
+	 * We need to wait until all workers imported initial snapshot.
+	 */
+	if (isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, true);
+
 	/* Join heap scan ourselves */
 	if (leaderparticipates)
 		_gin_leader_participate_as_worker(buildstate, heap, index);
@@ -1084,7 +1086,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
 	 * Caller needs to wait for all launched workers when we return.  Make
 	 * sure that the failure-to-start case will not hang forever.
 	 */
-	WaitForParallelWorkersToAttach(pcxt);
+	if (!isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, false);
 	if (need_pop_active_snapshot)
 		PopActiveSnapshot();
 }
@@ -1107,9 +1110,6 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
 	for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
 		InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]);
 
-	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(ginleader->snapshot))
-		UnregisterSnapshot(ginleader->snapshot);
 	DestroyParallelContext(ginleader->pcxt);
 	ExitParallelMode();
 }
@@ -1790,14 +1790,14 @@ _gin_parallel_merge(GinBuildState *state)
 
 /*
  * Returns size of shared memory required to store state for a parallel
- * gin index build based on the snapshot its parallel scan will use.
+ * gin index build.
  */
 static Size
-_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+_gin_parallel_estimate_shared(Relation heap)
 {
 	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
 	return add_size(BUFFERALIGN(sizeof(GinBuildShared)),
-					table_parallelscan_estimate(heap, snapshot));
+					table_parallelscan_estimate(heap, InvalidSnapshot));
 }
 
 /*
@@ -1820,6 +1820,7 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela
 	_gin_parallel_scan_and_build(buildstate, ginleader->ginshared,
 								 ginleader->sharedsort, heap, index,
 								 sortmem, true);
+	Assert(!ginleader->ginshared->isconcurrent || !TransactionIdIsValid(MyProc->xid));
 }
 
 /*
@@ -2179,6 +2180,13 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 
 	_gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort,
 								 heapRel, indexRel, sortmem, false);
+	if (ginshared->isconcurrent)
+	{
+		PopActiveSnapshot();
+		InvalidateCatalogSnapshot();
+		Assert(!TransactionIdIsValid(MyProc->xid));
+		PushActiveSnapshot(GetTransactionSnapshot());
+	}
 
 	/* Report WAL/buffer usage during parallel execution */
 	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index e32ee739733..a7e16871af6 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1235,14 +1235,13 @@ heapam_index_build_range_scan(Relation heapRelation,
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples). In a
 	 * concurrent build, or during bootstrap, we take a regular MVCC snapshot
-	 * and index whatever's live according to that.
+	 * and index whatever's live according to that while that snapshot is reset
+	 * every so often (in case of non-unique index).
 	 */
 	OldestXmin = InvalidTransactionId;
 
 	/*
 	 * For unique index we need consistent snapshot for the whole scan.
-	 * In case of parallel scan some additional infrastructure required
-	 * to perform scan with SO_RESET_SNAPSHOT which is not yet ready.
 	 */
 	reset_snapshots = indexInfo->ii_Concurrent &&
 					  !indexInfo->ii_Unique &&
@@ -1304,8 +1303,11 @@ heapam_index_build_range_scan(Relation heapRelation,
 		Assert(!IsBootstrapProcessingMode());
 		Assert(allow_sync);
 		snapshot = scan->rs_snapshot;
-		PushActiveSnapshot(snapshot);
-		need_pop_active_snapshot = true;
+		if (!reset_snapshots)
+		{
+			PushActiveSnapshot(snapshot);
+			need_pop_active_snapshot = true;
+		}
 	}
 
 	hscan = (HeapScanDesc) scan;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 7b09ad878b7..53b7ddfff0e 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -322,22 +322,20 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 			 RelationGetRelationName(index));
 
 	reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo);
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/*
 	 * Finish the build by (1) completing the sort of the spool file, (2)
 	 * inserting the sorted tuples into btree pages and (3) building the upper
 	 * levels.  Finally, it may also be necessary to end use of parallelism.
 	 */
-	_bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_ParallelWorkers && indexInfo->ii_Concurrent);
+	_bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_Unique && indexInfo->ii_Concurrent);
 	_bt_spooldestroy(buildstate.spool);
 	if (buildstate.spool2)
 		_bt_spooldestroy(buildstate.spool2);
 	if (buildstate.btleader)
 		_bt_end_parallel(buildstate.btleader);
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
@@ -486,8 +484,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
 		reltuples = _bt_parallel_heapscan(buildstate,
 										  &indexInfo->ii_BrokenHotChain);
 	InvalidateCatalogSnapshot();
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/*
 	 * Set the progress target for the next phase.  Reset the block number
@@ -1421,6 +1418,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	BufferUsage *bufferusage;
 	bool		leaderparticipates = true;
 	bool		need_pop_active_snapshot = true;
+	bool		reset_snapshot;
 	int			querylen;
 
 #ifdef DISABLE_LEADER_PARTICIPATION
@@ -1438,12 +1436,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 
 	scantuplesortstates = leaderparticipates ? request + 1 : request;
 
+    /*
+	 * For concurrent non-unique index builds, we can periodically reset snapshots
+	 * to allow the xmin horizon to advance. This is safe since these builds don't
+	 * require a consistent view across the entire scan. Unique indexes still need
+	 * a stable snapshot to properly enforce uniqueness constraints.
+     */
+	reset_snapshot = isconcurrent && !btspool->isunique;
+
 	/*
 	 * Prepare for scan of the base relation.  In a normal index build, we use
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
 	 * concurrent build, we take a regular MVCC snapshot and index whatever's
-	 * live according to that.
+	 * live according to that, while that snapshot may be reset periodically in
+	 * case of non-unique index.
 	 */
 	if (!isconcurrent)
 	{
@@ -1451,6 +1458,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 		snapshot = SnapshotAny;
 		need_pop_active_snapshot = false;
 	}
+	else if (reset_snapshot)
+	{
+		snapshot = InvalidSnapshot;
+		PushActiveSnapshot(GetTransactionSnapshot());
+	}
 	else
 	{
 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
@@ -1511,7 +1523,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	{
 		if (need_pop_active_snapshot)
 			PopActiveSnapshot();
-		if (IsMVCCSnapshot(snapshot))
+		if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
 			UnregisterSnapshot(snapshot);
 		DestroyParallelContext(pcxt);
 		ExitParallelMode();
@@ -1538,7 +1550,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot);
+								  snapshot,
+								  reset_snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -1614,6 +1627,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	/* Save leader state now that it's clear build will be parallel */
 	buildstate->btleader = btleader;
 
+	/*
+	 * In case of concurrent build snapshots are going to be reset periodically.
+	 * Wait until all workers imported initial snapshot.
+	 */
+	if (reset_snapshot)
+		WaitForParallelWorkersToAttach(pcxt, true);
+
 	/* Join heap scan ourselves */
 	if (leaderparticipates)
 		_bt_leader_participate_as_worker(buildstate);
@@ -1622,7 +1642,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	 * Caller needs to wait for all launched workers when we return.  Make
 	 * sure that the failure-to-start case will not hang forever.
 	 */
-	WaitForParallelWorkersToAttach(pcxt);
+	if (!reset_snapshot)
+		WaitForParallelWorkersToAttach(pcxt, false);
 	if (need_pop_active_snapshot)
 		PopActiveSnapshot();
 }
@@ -1646,7 +1667,7 @@ _bt_end_parallel(BTLeader *btleader)
 		InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
 
 	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(btleader->snapshot))
+	if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot))
 		UnregisterSnapshot(btleader->snapshot);
 	DestroyParallelContext(btleader->pcxt);
 	ExitParallelMode();
@@ -1896,6 +1917,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	SortCoordinate coordinate;
 	BTBuildState buildstate;
 	TableScanDesc scan;
+	ParallelTableScanDesc pscan;
 	double		reltuples;
 	IndexInfo  *indexInfo;
 
@@ -1950,11 +1972,15 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(btspool->index);
 	indexInfo->ii_Concurrent = btshared->isconcurrent;
-	scan = table_beginscan_parallel(btspool->heap,
-									ParallelTableScanFromBTShared(btshared));
+	pscan = ParallelTableScanFromBTShared(btshared);
+	scan = table_beginscan_parallel(btspool->heap, pscan);
 	reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo,
 									   true, progress, _bt_build_callback,
 									   &buildstate, scan);
+	InvalidateCatalogSnapshot();
+	if (pscan->phs_reset_snapshot)
+		PopActiveSnapshot();
+	Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin));
 
 	/* Execute this worker's part of the sort */
 	if (progress)
@@ -1990,4 +2016,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	tuplesort_end(btspool->sortstate);
 	if (btspool2)
 		tuplesort_end(btspool2->sortstate);
+	Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin));
+	if (pscan->phs_reset_snapshot)
+		PushActiveSnapshot(GetTransactionSnapshot());
 }
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 5e41404937e..8b33b6278ce 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 {
 	Size		sz = 0;
 
-	if (IsMVCCSnapshot(snapshot))
+	if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
 		sz = add_size(sz, EstimateSnapshotSpace(snapshot));
 	else
-		Assert(snapshot == SnapshotAny);
+		Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot);
 
 	sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
 
@@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, bool reset_snapshot)
 {
 	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
-	if (IsMVCCSnapshot(snapshot))
+	/*
+	 * Initialize parallel scan description. For normal scans with a regular
+	 * MVCC snapshot, serialize the snapshot info. For scans that use periodic
+	 * snapshot resets, mark the scan accordingly.
+	 */
+	if (reset_snapshot)
+	{
+		Assert(snapshot == InvalidSnapshot);
+		pscan->phs_snapshot_any = false;
+		pscan->phs_reset_snapshot = true;
+		INJECTION_POINT("table_parallelscan_initialize", NULL);
+	}
+	else if (IsMVCCSnapshot(snapshot))
 	{
 		SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
 		pscan->phs_snapshot_any = false;
+		pscan->phs_reset_snapshot = false;
 	}
 	else
 	{
 		Assert(snapshot == SnapshotAny);
+		Assert(!reset_snapshot);
 		pscan->phs_snapshot_any = true;
+		pscan->phs_reset_snapshot = false;
 	}
 }
 
@@ -171,7 +186,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
 
 	Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
 
-	if (!pscan->phs_snapshot_any)
+	/*
+	 * For scans that
+	 * use periodic snapshot resets, mark the scan accordingly and use the active
+	 * snapshot as the initial state.
+	 */
+	if (pscan->phs_reset_snapshot)
+	{
+		Assert(ActiveSnapshotSet());
+		flags |= SO_RESET_SNAPSHOT;
+		/* Start with current active snapshot. */
+		snapshot = GetActiveSnapshot();
+	}
+	else if (!pscan->phs_snapshot_any)
 	{
 		/* Snapshot was serialized -- restore it */
 		snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 94db1ec3012..065ea9d26f6 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -77,6 +77,7 @@
 #define PARALLEL_KEY_RELMAPPER_STATE		UINT64CONST(0xFFFFFFFFFFFF000D)
 #define PARALLEL_KEY_UNCOMMITTEDENUMS		UINT64CONST(0xFFFFFFFFFFFF000E)
 #define PARALLEL_KEY_CLIENTCONNINFO			UINT64CONST(0xFFFFFFFFFFFF000F)
+#define PARALLEL_KEY_SNAPSHOT_RESTORED		UINT64CONST(0xFFFFFFFFFFFF0010)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -305,6 +306,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
 										pcxt->nworkers));
 		shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+		shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool),
+							   pcxt->nworkers));
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 		/* Estimate how much we'll need for the entrypoint info. */
 		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
 							   strlen(pcxt->function_name) + 2);
@@ -376,6 +381,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		char	   *entrypointstate;
 		char	   *uncommittedenumsspace;
 		char	   *clientconninfospace;
+		bool	   *snapshot_set_flag_space;
 		Size		lnamelen;
 
 		/* Serialize shared libraries we have loaded. */
@@ -491,6 +497,19 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		strcpy(entrypointstate, pcxt->library_name);
 		strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
+
+		/*
+		 * Establish dynamic shared memory to pass information about importing
+		 * of snapshot.
+		 */
+		snapshot_set_flag_space =
+				shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers));
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			pcxt->worker[i].snapshot_restored = snapshot_set_flag_space + i * sizeof(bool);
+			*pcxt->worker[i].snapshot_restored = false;
+		}
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space);
 	}
 
 	/* Update nworkers_to_launch, in case we changed nworkers above. */
@@ -546,6 +565,17 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
 		}
 	}
+
+	/* Set snapshot restored flag to false. */
+	if (pcxt->nworkers > 0)
+	{
+		bool	   *snapshot_restored_space;
+		int			i;
+		snapshot_restored_space =
+				shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false);
+		for (i = 0; i < pcxt->nworkers; ++i)
+			snapshot_restored_space[i] = false;
+	}
 }
 
 /*
@@ -661,6 +691,10 @@ LaunchParallelWorkers(ParallelContext *pcxt)
  * Wait for all workers to attach to their error queues, and throw an error if
  * any worker fails to do this.
  *
+ * wait_for_snapshot: track whether each parallel worker has successfully restored
+ * its snapshot. This is needed when using periodic snapshot resets to ensure all
+ * workers have a valid initial snapshot before proceeding with the scan.
+ *
  * Callers can assume that if this function returns successfully, then the
  * number of workers given by pcxt->nworkers_launched have initialized and
  * attached to their error queues.  Whether or not these workers are guaranteed
@@ -690,7 +724,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
  * call this function at all.
  */
 void
-WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot)
 {
 	int			i;
 
@@ -734,9 +768,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt)
 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
 				if (shm_mq_get_sender(mq) != NULL)
 				{
-					/* Yes, so it is known to be attached. */
-					pcxt->known_attached_workers[i] = true;
-					++pcxt->nknown_attached_workers;
+					if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored))
+					{
+						/* Yes, so it is known to be attached. */
+						pcxt->known_attached_workers[i] = true;
+						++pcxt->nknown_attached_workers;
+					}
 				}
 			}
 			else if (status == BGWH_STOPPED)
@@ -1295,6 +1332,7 @@ ParallelWorkerMain(Datum main_arg)
 	shm_toc    *toc;
 	FixedParallelState *fps;
 	char	   *error_queue_space;
+	bool	   *snapshot_restored_space;
 	shm_mq	   *mq;
 	shm_mq_handle *mqh;
 	char	   *libraryspace;
@@ -1499,6 +1537,10 @@ ParallelWorkerMain(Datum main_arg)
 							   fps->parallel_leader_pgproc);
 	PushActiveSnapshot(asnapshot);
 
+	/* Snapshot is restored, set flag to make leader know about it. */
+	snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false);
+	snapshot_restored_space[ParallelWorkerNumber] = true;
+
 	/*
 	 * We've changed which tuples we can see, and must therefore invalidate
 	 * system caches.
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index f50221930fd..32b7e6311eb 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1533,7 +1533,7 @@ index_concurrently_build(Oid heapRelationId,
 	index_build(heapRel, indexRelation, indexInfo, false, true);
 
 	InvalidateCatalogSnapshot();
-	Assert((indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique) || !TransactionIdIsValid(MyProc->xmin));
+	Assert(indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/* Roll back any GUC changes executed by index functions */
 	AtEOXact_GUC(false, save_nestlevel);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 94047d29430..f16284d4d0d 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -371,7 +371,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  false);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 8e1a918f130..68ea98405bb 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -353,14 +353,6 @@ GetTransactionSnapshot(void)
 Snapshot
 GetLatestSnapshot(void)
 {
-	/*
-	 * We might be able to relax this, but nothing that could otherwise work
-	 * needs it.
-	 */
-	if (IsInParallelMode())
-		elog(ERROR,
-			 "cannot update SecondarySnapshot during a parallel operation");
-
 	/*
 	 * So far there are no cases requiring support for GetLatestSnapshot()
 	 * during logical decoding, but it wouldn't be hard to add if required.
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index f37be6d5690..a7362f7b43b 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -26,6 +26,7 @@ typedef struct ParallelWorkerInfo
 {
 	BackgroundWorkerHandle *bgwhandle;
 	shm_mq_handle *error_mqh;
+	bool		  *snapshot_restored;
 } ParallelWorkerInfo;
 
 typedef struct ParallelContext
@@ -65,7 +66,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
-extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot);
 extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
 extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index b5e0fb386c0..50441c58cea 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -82,6 +82,7 @@ typedef struct ParallelTableScanDescData
 	RelFileLocator phs_locator; /* physical relation to scan */
 	bool		phs_syncscan;	/* report location to syncscan logic? */
 	bool		phs_snapshot_any;	/* SnapshotAny, not phs_snapshot_data? */
+	bool		phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */
 	Size		phs_snapshot_off;	/* data for snapshot */
 } ParallelTableScanDescData;
 typedef struct ParallelTableScanDescData *ParallelTableScanDesc;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 71af14d1c31..613615c78cd 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1140,7 +1140,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  bool reset_snapshot);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -1762,9 +1763,9 @@ table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
  * This only really makes sense for heap AM, it might need to be generalized
  * for other AMs later.
  *
- * In case of non-unique index and non-parallel concurrent build
- * SO_RESET_SNAPSHOT is applied for the scan. That leads for changing snapshots
- * on the fly to allow xmin horizon propagate.
+ * In case of non-unique concurrent  index build SO_RESET_SNAPSHOT is applied
+ * for the scan. That leads for changing snapshots on the fly to allow xmin
+ * horizon propagate.
  */
 static inline double
 table_index_build_scan(Relation table_rel,
diff --git a/src/test/modules/injection_points/expected/cic_reset_snapshots.out b/src/test/modules/injection_points/expected/cic_reset_snapshots.out
index 948d1232aa0..595a4000ce0 100644
--- a/src/test/modules/injection_points/expected/cic_reset_snapshots.out
+++ b/src/test/modules/injection_points/expected/cic_reset_snapshots.out
@@ -17,6 +17,12 @@ SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'
  
 (1 row)
 
+SELECT injection_points_attach('table_parallelscan_initialize', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
 CREATE SCHEMA cic_reset_snap;
 CREATE TABLE cic_reset_snap.tbl(i int primary key, j int);
 INSERT INTO cic_reset_snap.tbl SELECT i, i * I FROM generate_series(1, 200) s(i);
@@ -72,30 +78,45 @@ NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 -- The same in parallel mode
 ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2);
+-- Detach to keep test stable, since parallel worker may complete scan before leader
+SELECT injection_points_detach('heap_reset_scan_snapshot_effective');
+ injection_points_detach 
+-------------------------
+ 
+(1 row)
+
 CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(MOD(i, 2), j) WHERE MOD(i, 2) = 0;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable(i);
 NOTICE:  notice triggered for injection point table_beginscan_strat_reset_snapshots
-NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 NOTICE:  notice triggered for injection point table_beginscan_strat_reset_snapshots
-NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable_no_param();
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i DESC NULLS LAST);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl USING BRIN(i);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP SCHEMA cic_reset_snap CASCADE;
 NOTICE:  drop cascades to 3 other objects
diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
index 5072535b355..2941aa7ae38 100644
--- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
+++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
@@ -3,7 +3,7 @@ CREATE EXTENSION injection_points;
 SELECT injection_points_set_local();
 SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice');
 SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice');
-
+SELECT injection_points_attach('table_parallelscan_initialize', 'notice');
 
 CREATE SCHEMA cic_reset_snap;
 CREATE TABLE cic_reset_snap.tbl(i int primary key, j int);
@@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 -- The same in parallel mode
 ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2);
 
+-- Detach to keep test stable, since parallel worker may complete scan before leader
+SELECT injection_points_detach('heap_reset_scan_snapshot_effective');
+
 CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
@@ -83,4 +86,4 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 
 DROP SCHEMA cic_reset_snap CASCADE;
 
-DROP EXTENSION injection_points;
+DROP EXTENSION injection_points;
\ No newline at end of file
-- 
2.48.1

