From 225b911a2d733030a42e68152ad47f86db9a715b Mon Sep 17 00:00:00 2001
From: nkey <michail.nikolaev@gmail.com>
Date: Wed, 1 Jan 2025 15:25:20 +0100
Subject: [PATCH v15 04/12] Allow snapshot resets during parallel concurrent
 index builds

Previously, non-unique concurrent index builds in parallel mode required a
consistent MVCC snapshot throughout the build, which could hold back the xmin
horizon and prevent dead tuple cleanup. This patch extends the previous work
on snapshot resets (introduced for non-parallel builds) to also support
parallel builds.

Key changes:
- Add infrastructure to track snapshot restoration in parallel workers
- Extend parallel scan initialization to support periodic snapshot resets
- Wait for parallel workers to restore their initial snapshots before proceeding with scan
- Add regression tests to verify behavior with various index types

The snapshot reset approach is safe for non-unique indexes since they don't
need snapshot consistency across the entire scan. For unique indexes, we
continue to maintain a consistent snapshot to properly enforce uniqueness
constraints.

This helps reduce the xmin horizon impact of long-running concurrent index
builds in parallel mode, improving VACUUM's ability to clean up dead tuples.
---
 src/backend/access/brin/brin.c                | 49 +++++++++-------
 src/backend/access/heap/heapam_handler.c      | 12 ++--
 src/backend/access/nbtree/nbtsort.c           | 57 ++++++++++++++-----
 src/backend/access/table/tableam.c            | 37 ++++++++++--
 src/backend/access/transam/parallel.c         | 50 ++++++++++++++--
 src/backend/catalog/index.c                   |  2 +-
 src/backend/executor/nodeSeqscan.c            |  3 +-
 src/backend/utils/time/snapmgr.c              |  8 ---
 src/include/access/parallel.h                 |  3 +-
 src/include/access/relscan.h                  |  1 +
 src/include/access/tableam.h                  |  9 +--
 .../expected/cic_reset_snapshots.out          | 25 +++++++-
 .../sql/cic_reset_snapshots.sql               |  7 ++-
 13 files changed, 196 insertions(+), 67 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index f1dba9e8185..d8317787251 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -143,7 +143,6 @@ typedef struct BrinLeader
 	 */
 	BrinShared *brinshared;
 	Sharedsort *sharedsort;
-	Snapshot	snapshot;
 	WalUsage   *walusage;
 	BufferUsage *bufferusage;
 } BrinLeader;
@@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
 static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 								 bool isconcurrent, int request);
 static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
-static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static Size _brin_parallel_estimate_shared(Relation heap);
 static double _brin_parallel_heapscan(BrinBuildState *state);
 static double _brin_parallel_merge(BrinBuildState *state);
 static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
@@ -1248,7 +1247,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		brin_fill_empty_ranges(state,
 							   state->bs_currRangeStart,
 							   state->bs_maxRangeStart);
-		Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
 	}
 
 	/* release resources */
@@ -1263,6 +1261,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 
 	result->heap_tuples = reltuples;
 	result->index_tuples = idxtuples;
+	Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
 
 	return result;
 }
@@ -2363,7 +2362,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 {
 	ParallelContext *pcxt;
 	int			scantuplesortstates;
-	Snapshot	snapshot;
 	Size		estbrinshared;
 	Size		estsort;
 	BrinShared *brinshared;
@@ -2394,25 +2392,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	 * Prepare for scan of the base relation.  In a normal index build, we use
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
-	 * concurrent build, we take a regular MVCC snapshot and index whatever's
-	 * live according to that.
+	 * concurrent build, we take a regular MVCC snapshot and push it as active.
+	 * Later we index whatever's live according to that snapshot while that
+	 * snapshot is reset periodically.
 	 */
 	if (!isconcurrent)
 	{
 		Assert(ActiveSnapshotSet());
-		snapshot = SnapshotAny;
 		need_pop_active_snapshot = false;
 	}
 	else
 	{
-		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+		Assert(!ActiveSnapshotSet());
 		PushActiveSnapshot(GetTransactionSnapshot());
 	}
 
 	/*
 	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
 	 */
-	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	estbrinshared = _brin_parallel_estimate_shared(heap);
 	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
 	estsort = tuplesort_estimate_shared(scantuplesortstates);
 	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -2452,8 +2450,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	{
 		if (need_pop_active_snapshot)
 			PopActiveSnapshot();
-		if (IsMVCCSnapshot(snapshot))
-			UnregisterSnapshot(snapshot);
 		DestroyParallelContext(pcxt);
 		ExitParallelMode();
 		return;
@@ -2478,7 +2474,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromBrinShared(brinshared),
-								  snapshot);
+								  isconcurrent ? InvalidSnapshot : SnapshotAny,
+								  isconcurrent);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2524,7 +2521,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 		brinleader->nparticipanttuplesorts++;
 	brinleader->brinshared = brinshared;
 	brinleader->sharedsort = sharedsort;
-	brinleader->snapshot = snapshot;
 	brinleader->walusage = walusage;
 	brinleader->bufferusage = bufferusage;
 
@@ -2540,6 +2536,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	/* Save leader state now that it's clear build will be parallel */
 	buildstate->bs_leader = brinleader;
 
+	/*
+	 * In case of concurrent build snapshots are going to be reset periodically.
+	 * We need to wait until all workers imported initial snapshot.
+	 */
+	if (isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, true);
+
 	/* Join heap scan ourselves */
 	if (leaderparticipates)
 		_brin_leader_participate_as_worker(buildstate, heap, index);
@@ -2548,7 +2551,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	 * Caller needs to wait for all launched workers when we return.  Make
 	 * sure that the failure-to-start case will not hang forever.
 	 */
-	WaitForParallelWorkersToAttach(pcxt);
+	if (!isconcurrent)
+		WaitForParallelWorkersToAttach(pcxt, false);
 	if (need_pop_active_snapshot)
 		PopActiveSnapshot();
 }
@@ -2571,9 +2575,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
 		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
 
-	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(brinleader->snapshot))
-		UnregisterSnapshot(brinleader->snapshot);
 	DestroyParallelContext(brinleader->pcxt);
 	ExitParallelMode();
 }
@@ -2773,14 +2774,14 @@ _brin_parallel_merge(BrinBuildState *state)
 
 /*
  * Returns size of shared memory required to store state for a parallel
- * brin index build based on the snapshot its parallel scan will use.
+ * brin index build.
  */
 static Size
-_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+_brin_parallel_estimate_shared(Relation heap)
 {
 	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
 	return add_size(BUFFERALIGN(sizeof(BrinShared)),
-					table_parallelscan_estimate(heap, snapshot));
+					table_parallelscan_estimate(heap, InvalidSnapshot));
 }
 
 /*
@@ -2802,6 +2803,7 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
 	/* Perform work common to all participants */
 	_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
 								  brinleader->sharedsort, heap, index, sortmem, true);
+	Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xid));
 }
 
 /*
@@ -2942,6 +2944,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
 
 	_brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
 								  heapRel, indexRel, sortmem, false);
+	if (brinshared->isconcurrent)
+	{
+		PopActiveSnapshot();
+		InvalidateCatalogSnapshot();
+		Assert(!TransactionIdIsValid(MyProc->xid));
+		PushActiveSnapshot(GetTransactionSnapshot());
+	}
 
 	/* Report WAL/buffer usage during parallel execution */
 	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6d4de77037c..2a617a05f8c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1231,14 +1231,13 @@ heapam_index_build_range_scan(Relation heapRelation,
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples). In a
 	 * concurrent build, or during bootstrap, we take a regular MVCC snapshot
-	 * and index whatever's live according to that.
+	 * and index whatever's live according to that while that snapshot is reset
+	 * every so often (in case of non-unique index).
 	 */
 	OldestXmin = InvalidTransactionId;
 
 	/*
 	 * For unique index we need consistent snapshot for the whole scan.
-	 * In case of parallel scan some additional infrastructure required
-	 * to perform scan with SO_RESET_SNAPSHOT which is not yet ready.
 	 */
 	reset_snapshots = indexInfo->ii_Concurrent &&
 					  !indexInfo->ii_Unique &&
@@ -1300,8 +1299,11 @@ heapam_index_build_range_scan(Relation heapRelation,
 		Assert(!IsBootstrapProcessingMode());
 		Assert(allow_sync);
 		snapshot = scan->rs_snapshot;
-		PushActiveSnapshot(snapshot);
-		need_pop_active_snapshot = true;
+		if (!reset_snapshots)
+		{
+			PushActiveSnapshot(snapshot);
+			need_pop_active_snapshot = true;
+		}
 	}
 
 	hscan = (HeapScanDesc) scan;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index b490da0eeee..810f80fc8e6 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -321,22 +321,20 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 			 RelationGetRelationName(index));
 
 	reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo);
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/*
 	 * Finish the build by (1) completing the sort of the spool file, (2)
 	 * inserting the sorted tuples into btree pages and (3) building the upper
 	 * levels.  Finally, it may also be necessary to end use of parallelism.
 	 */
-	_bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_ParallelWorkers && indexInfo->ii_Concurrent);
+	_bt_leafbuild(buildstate.spool, buildstate.spool2, !indexInfo->ii_Unique && indexInfo->ii_Concurrent);
 	_bt_spooldestroy(buildstate.spool);
 	if (buildstate.spool2)
 		_bt_spooldestroy(buildstate.spool2);
 	if (buildstate.btleader)
 		_bt_end_parallel(buildstate.btleader);
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
 
@@ -485,8 +483,7 @@ _bt_spools_heapscan(Relation heap, Relation index, BTBuildState *buildstate,
 		reltuples = _bt_parallel_heapscan(buildstate,
 										  &indexInfo->ii_BrokenHotChain);
 	InvalidateCatalogSnapshot();
-	Assert(indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique ||
-		  !indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
+	Assert(!indexInfo->ii_Concurrent || indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/*
 	 * Set the progress target for the next phase.  Reset the block number
@@ -1421,6 +1418,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	BufferUsage *bufferusage;
 	bool		leaderparticipates = true;
 	bool		need_pop_active_snapshot = true;
+	bool		reset_snapshot;
 	int			querylen;
 
 #ifdef DISABLE_LEADER_PARTICIPATION
@@ -1438,12 +1436,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 
 	scantuplesortstates = leaderparticipates ? request + 1 : request;
 
+    /*
+	 * For concurrent non-unique index builds, we can periodically reset snapshots
+	 * to allow the xmin horizon to advance. This is safe since these builds don't
+	 * require a consistent view across the entire scan. Unique indexes still need
+	 * a stable snapshot to properly enforce uniqueness constraints.
+     */
+	reset_snapshot = isconcurrent && !btspool->isunique;
+
 	/*
 	 * Prepare for scan of the base relation.  In a normal index build, we use
 	 * SnapshotAny because we must retrieve all tuples and do our own time
 	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
 	 * concurrent build, we take a regular MVCC snapshot and index whatever's
-	 * live according to that.
+	 * live according to that, while that snapshot may be reset periodically in
+	 * case of non-unique index.
 	 */
 	if (!isconcurrent)
 	{
@@ -1451,6 +1458,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 		snapshot = SnapshotAny;
 		need_pop_active_snapshot = false;
 	}
+	else if (reset_snapshot)
+	{
+		snapshot = InvalidSnapshot;
+		PushActiveSnapshot(GetTransactionSnapshot());
+	}
 	else
 	{
 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
@@ -1511,7 +1523,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	{
 		if (need_pop_active_snapshot)
 			PopActiveSnapshot();
-		if (IsMVCCSnapshot(snapshot))
+		if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
 			UnregisterSnapshot(snapshot);
 		DestroyParallelContext(pcxt);
 		ExitParallelMode();
@@ -1538,7 +1550,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot);
+								  snapshot,
+								  reset_snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -1614,6 +1627,13 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	/* Save leader state now that it's clear build will be parallel */
 	buildstate->btleader = btleader;
 
+	/*
+	 * In case of concurrent build snapshots are going to be reset periodically.
+	 * Wait until all workers imported initial snapshot.
+	 */
+	if (reset_snapshot)
+		WaitForParallelWorkersToAttach(pcxt, true);
+
 	/* Join heap scan ourselves */
 	if (leaderparticipates)
 		_bt_leader_participate_as_worker(buildstate);
@@ -1622,7 +1642,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	 * Caller needs to wait for all launched workers when we return.  Make
 	 * sure that the failure-to-start case will not hang forever.
 	 */
-	WaitForParallelWorkersToAttach(pcxt);
+	if (!reset_snapshot)
+		WaitForParallelWorkersToAttach(pcxt, false);
 	if (need_pop_active_snapshot)
 		PopActiveSnapshot();
 }
@@ -1646,7 +1667,7 @@ _bt_end_parallel(BTLeader *btleader)
 		InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
 
 	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(btleader->snapshot))
+	if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot))
 		UnregisterSnapshot(btleader->snapshot);
 	DestroyParallelContext(btleader->pcxt);
 	ExitParallelMode();
@@ -1896,6 +1917,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	SortCoordinate coordinate;
 	BTBuildState buildstate;
 	TableScanDesc scan;
+	ParallelTableScanDesc pscan;
 	double		reltuples;
 	IndexInfo  *indexInfo;
 
@@ -1950,11 +1972,15 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	/* Join parallel scan */
 	indexInfo = BuildIndexInfo(btspool->index);
 	indexInfo->ii_Concurrent = btshared->isconcurrent;
-	scan = table_beginscan_parallel(btspool->heap,
-									ParallelTableScanFromBTShared(btshared));
+	pscan = ParallelTableScanFromBTShared(btshared);
+	scan = table_beginscan_parallel(btspool->heap, pscan);
 	reltuples = table_index_build_scan(btspool->heap, btspool->index, indexInfo,
 									   true, progress, _bt_build_callback,
 									   &buildstate, scan);
+	InvalidateCatalogSnapshot();
+	if (pscan->phs_reset_snapshot)
+		PopActiveSnapshot();
+	Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin));
 
 	/* Execute this worker's part of the sort */
 	if (progress)
@@ -1990,4 +2016,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2,
 	tuplesort_end(btspool->sortstate);
 	if (btspool2)
 		tuplesort_end(btspool2->sortstate);
+	Assert(!pscan->phs_reset_snapshot || !TransactionIdIsValid(MyProc->xmin));
+	if (pscan->phs_reset_snapshot)
+		PushActiveSnapshot(GetTransactionSnapshot());
 }
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index a56c5eceb14..277c79dd554 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -132,10 +132,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 {
 	Size		sz = 0;
 
-	if (IsMVCCSnapshot(snapshot))
+	if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot))
 		sz = add_size(sz, EstimateSnapshotSpace(snapshot));
 	else
-		Assert(snapshot == SnapshotAny);
+		Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot);
 
 	sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
 
@@ -144,21 +144,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, bool reset_snapshot)
 {
 	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
-	if (IsMVCCSnapshot(snapshot))
+	/*
+	 * Initialize parallel scan description. For normal scans with a regular
+	 * MVCC snapshot, serialize the snapshot info. For scans that use periodic
+	 * snapshot resets, mark the scan accordingly.
+	 */
+	if (reset_snapshot)
+	{
+		Assert(snapshot == InvalidSnapshot);
+		pscan->phs_snapshot_any = false;
+		pscan->phs_reset_snapshot = true;
+		INJECTION_POINT("table_parallelscan_initialize");
+	}
+	else if (IsMVCCSnapshot(snapshot))
 	{
 		SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
 		pscan->phs_snapshot_any = false;
+		pscan->phs_reset_snapshot = false;
 	}
 	else
 	{
 		Assert(snapshot == SnapshotAny);
+		Assert(!reset_snapshot);
 		pscan->phs_snapshot_any = true;
+		pscan->phs_reset_snapshot = false;
 	}
 }
 
@@ -171,7 +186,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
 
 	Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
 
-	if (!pscan->phs_snapshot_any)
+	/*
+	 * For scans that
+	 * use periodic snapshot resets, mark the scan accordingly and use the active
+	 * snapshot as the initial state.
+	 */
+	if (pscan->phs_reset_snapshot)
+	{
+		Assert(ActiveSnapshotSet());
+		flags |= SO_RESET_SNAPSHOT;
+		/* Start with current active snapshot. */
+		snapshot = GetActiveSnapshot();
+	}
+	else if (!pscan->phs_snapshot_any)
 	{
 		/* Snapshot was serialized -- restore it */
 		snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 4ab5df92133..ec3c80fef27 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -76,6 +76,7 @@
 #define PARALLEL_KEY_RELMAPPER_STATE		UINT64CONST(0xFFFFFFFFFFFF000D)
 #define PARALLEL_KEY_UNCOMMITTEDENUMS		UINT64CONST(0xFFFFFFFFFFFF000E)
 #define PARALLEL_KEY_CLIENTCONNINFO			UINT64CONST(0xFFFFFFFFFFFF000F)
+#define PARALLEL_KEY_SNAPSHOT_RESTORED		UINT64CONST(0xFFFFFFFFFFFF0010)
 
 /* Fixed-size parallel state. */
 typedef struct FixedParallelState
@@ -301,6 +302,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
 										pcxt->nworkers));
 		shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+		shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool),
+							   pcxt->nworkers));
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 		/* Estimate how much we'll need for the entrypoint info. */
 		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
 							   strlen(pcxt->function_name) + 2);
@@ -372,6 +377,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		char	   *entrypointstate;
 		char	   *uncommittedenumsspace;
 		char	   *clientconninfospace;
+		bool	   *snapshot_set_flag_space;
 		Size		lnamelen;
 
 		/* Serialize shared libraries we have loaded. */
@@ -487,6 +493,19 @@ InitializeParallelDSM(ParallelContext *pcxt)
 		strcpy(entrypointstate, pcxt->library_name);
 		strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
+
+		/*
+		 * Establish dynamic shared memory to pass information about importing
+		 * of snapshot.
+		 */
+		snapshot_set_flag_space =
+				shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers));
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			pcxt->worker[i].snapshot_restored = snapshot_set_flag_space + i * sizeof(bool);
+			*pcxt->worker[i].snapshot_restored = false;
+		}
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space);
 	}
 
 	/* Update nworkers_to_launch, in case we changed nworkers above. */
@@ -542,6 +561,17 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
 		}
 	}
+
+	/* Set snapshot restored flag to false. */
+	if (pcxt->nworkers > 0)
+	{
+		bool	   *snapshot_restored_space;
+		int			i;
+		snapshot_restored_space =
+				shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false);
+		for (i = 0; i < pcxt->nworkers; ++i)
+			snapshot_restored_space[i] = false;
+	}
 }
 
 /*
@@ -657,6 +687,10 @@ LaunchParallelWorkers(ParallelContext *pcxt)
  * Wait for all workers to attach to their error queues, and throw an error if
  * any worker fails to do this.
  *
+ * wait_for_snapshot: track whether each parallel worker has successfully restored
+ * its snapshot. This is needed when using periodic snapshot resets to ensure all
+ * workers have a valid initial snapshot before proceeding with the scan.
+ *
  * Callers can assume that if this function returns successfully, then the
  * number of workers given by pcxt->nworkers_launched have initialized and
  * attached to their error queues.  Whether or not these workers are guaranteed
@@ -686,7 +720,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
  * call this function at all.
  */
 void
-WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot)
 {
 	int			i;
 
@@ -730,9 +764,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt)
 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
 				if (shm_mq_get_sender(mq) != NULL)
 				{
-					/* Yes, so it is known to be attached. */
-					pcxt->known_attached_workers[i] = true;
-					++pcxt->nknown_attached_workers;
+					if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored))
+					{
+						/* Yes, so it is known to be attached. */
+						pcxt->known_attached_workers[i] = true;
+						++pcxt->nknown_attached_workers;
+					}
 				}
 			}
 			else if (status == BGWH_STOPPED)
@@ -1291,6 +1328,7 @@ ParallelWorkerMain(Datum main_arg)
 	shm_toc    *toc;
 	FixedParallelState *fps;
 	char	   *error_queue_space;
+	bool	   *snapshot_restored_space;
 	shm_mq	   *mq;
 	shm_mq_handle *mqh;
 	char	   *libraryspace;
@@ -1495,6 +1533,10 @@ ParallelWorkerMain(Datum main_arg)
 							   fps->parallel_leader_pgproc);
 	PushActiveSnapshot(asnapshot);
 
+	/* Snapshot is restored, set flag to make leader know about it. */
+	snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false);
+	snapshot_restored_space[ParallelWorkerNumber] = true;
+
 	/*
 	 * We've changed which tuples we can see, and must therefore invalidate
 	 * system caches.
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 210fc88433f..dacff9605ad 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1531,7 +1531,7 @@ index_concurrently_build(Oid heapRelationId,
 
 	/* Invalidate catalog snapshot just for assert */
 	InvalidateCatalogSnapshot();
-	Assert((indexInfo->ii_ParallelWorkers || indexInfo->ii_Unique) || !TransactionIdIsValid(MyProc->xmin));
+	Assert(indexInfo->ii_Unique || !TransactionIdIsValid(MyProc->xmin));
 
 	/* Roll back any GUC changes executed by index functions */
 	AtEOXact_GUC(false, save_nestlevel);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 6f9e991eeae..bc639964ada 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -367,7 +367,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  false);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 3d018c3a1e8..4cd536e988c 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -283,14 +283,6 @@ GetTransactionSnapshot(void)
 Snapshot
 GetLatestSnapshot(void)
 {
-	/*
-	 * We might be able to relax this, but nothing that could otherwise work
-	 * needs it.
-	 */
-	if (IsInParallelMode())
-		elog(ERROR,
-			 "cannot update SecondarySnapshot during a parallel operation");
-
 	/*
 	 * So far there are no cases requiring support for GetLatestSnapshot()
 	 * during logical decoding, but it wouldn't be hard to add if required.
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 8811618acb7..f5cae39c85f 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -26,6 +26,7 @@ typedef struct ParallelWorkerInfo
 {
 	BackgroundWorkerHandle *bgwhandle;
 	shm_mq_handle *error_mqh;
+	bool		  *snapshot_restored;
 } ParallelWorkerInfo;
 
 typedef struct ParallelContext
@@ -65,7 +66,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
-extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot);
 extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
 extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index dc6e0184284..8529b808aed 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -82,6 +82,7 @@ typedef struct ParallelTableScanDescData
 	RelFileLocator phs_locator; /* physical relation to scan */
 	bool		phs_syncscan;	/* report location to syncscan logic? */
 	bool		phs_snapshot_any;	/* SnapshotAny, not phs_snapshot_data? */
+	bool		phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */
 	Size		phs_snapshot_off;	/* data for snapshot */
 } ParallelTableScanDescData;
 typedef struct ParallelTableScanDescData *ParallelTableScanDesc;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 5393b30c57e..313394d92c6 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -1181,7 +1181,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  bool reset_snapshot);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -1799,9 +1800,9 @@ table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
  * This only really makes sense for heap AM, it might need to be generalized
  * for other AMs later.
  *
- * In case of non-unique index and non-parallel concurrent build
- * SO_RESET_SNAPSHOT is applied for the scan. That leads for changing snapshots
- * on the fly to allow xmin horizon propagate.
+ * In case of non-unique concurrent  index build SO_RESET_SNAPSHOT is applied
+ * for the scan. That leads for changing snapshots on the fly to allow xmin
+ * horizon propagate.
  */
 static inline double
 table_index_build_scan(Relation table_rel,
diff --git a/src/test/modules/injection_points/expected/cic_reset_snapshots.out b/src/test/modules/injection_points/expected/cic_reset_snapshots.out
index 948d1232aa0..595a4000ce0 100644
--- a/src/test/modules/injection_points/expected/cic_reset_snapshots.out
+++ b/src/test/modules/injection_points/expected/cic_reset_snapshots.out
@@ -17,6 +17,12 @@ SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'
  
 (1 row)
 
+SELECT injection_points_attach('table_parallelscan_initialize', 'notice');
+ injection_points_attach 
+-------------------------
+ 
+(1 row)
+
 CREATE SCHEMA cic_reset_snap;
 CREATE TABLE cic_reset_snap.tbl(i int primary key, j int);
 INSERT INTO cic_reset_snap.tbl SELECT i, i * I FROM generate_series(1, 200) s(i);
@@ -72,30 +78,45 @@ NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 -- The same in parallel mode
 ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2);
+-- Detach to keep test stable, since parallel worker may complete scan before leader
+SELECT injection_points_detach('heap_reset_scan_snapshot_effective');
+ injection_points_detach 
+-------------------------
+ 
+(1 row)
+
 CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(MOD(i, 2), j) WHERE MOD(i, 2) = 0;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable(i);
 NOTICE:  notice triggered for injection point table_beginscan_strat_reset_snapshots
-NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 NOTICE:  notice triggered for injection point table_beginscan_strat_reset_snapshots
-NOTICE:  notice triggered for injection point heap_reset_scan_snapshot_effective
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable_no_param();
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i DESC NULLS LAST);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl USING BRIN(i);
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
+NOTICE:  notice triggered for injection point table_parallelscan_initialize
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP SCHEMA cic_reset_snap CASCADE;
 NOTICE:  drop cascades to 3 other objects
diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
index 5072535b355..2941aa7ae38 100644
--- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
+++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql
@@ -3,7 +3,7 @@ CREATE EXTENSION injection_points;
 SELECT injection_points_set_local();
 SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice');
 SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice');
-
+SELECT injection_points_attach('table_parallelscan_initialize', 'notice');
 
 CREATE SCHEMA cic_reset_snap;
 CREATE TABLE cic_reset_snap.tbl(i int primary key, j int);
@@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 -- The same in parallel mode
 ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2);
 
+-- Detach to keep test stable, since parallel worker may complete scan before leader
+SELECT injection_points_detach('heap_reset_scan_snapshot_effective');
+
 CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i);
 REINDEX INDEX CONCURRENTLY cic_reset_snap.idx;
 DROP INDEX CONCURRENTLY cic_reset_snap.idx;
@@ -83,4 +86,4 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx;
 
 DROP SCHEMA cic_reset_snap CASCADE;
 
-DROP EXTENSION injection_points;
+DROP EXTENSION injection_points;
\ No newline at end of file
-- 
2.43.0

