From 531aeac7310864ac4f59573bd71254e763adab70 Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <arseniy.mukhin.dev@gmail.com>
Date: Sun, 10 Aug 2025 16:25:08 +0300
Subject: [PATCH v10 4/4] amcheck: brin_index_check() - heap all indexed

This commit extends functionality of brin_index_check() with
heap_all_indexed check: we validate every index range tuple
against every heap tuple within the range using consistentFn.
Also, we check here that fields 'has_nulls', 'all_nulls' and
'empty_range' are consistent with the range heap data. It's the most
expensive part of the brin_index_check(), so it's optional.
---
 contrib/amcheck/amcheck--1.5--1.6.sql   |   6 +-
 contrib/amcheck/expected/check_brin.out |  60 ++-
 contrib/amcheck/sql/check_brin.sql      |  54 ++-
 contrib/amcheck/t/007_verify_brin.pl    |  51 ++-
 contrib/amcheck/verify_brin.c           | 501 ++++++++++++++++++++++++
 doc/src/sgml/amcheck.sgml               |  39 +-
 6 files changed, 685 insertions(+), 26 deletions(-)

diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
index 0354451c472..55276527e68 100644
--- a/contrib/amcheck/amcheck--1.5--1.6.sql
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -8,11 +8,13 @@
 -- brin_index_check()
 --
 CREATE FUNCTION brin_index_check(index regclass,
-                                 regularpagescheck boolean default false
+                                 regularpagescheck boolean default false,
+                                 heapallindexed boolean default false,
+                                 variadic text[] default '{}'
 )
     RETURNS VOID
 AS 'MODULE_PATHNAME', 'brin_index_check'
 LANGUAGE C STRICT PARALLEL RESTRICTED;
 
 -- We don't want this to be available to public
-REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC;
\ No newline at end of file
+REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC;
\ No newline at end of file
diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out
index 6890fff46bd..909b41cb7a9 100644
--- a/contrib/amcheck/expected/check_brin.out
+++ b/contrib/amcheck/expected/check_brin.out
@@ -5,7 +5,7 @@ $$ LANGUAGE sql;
 -- empty table index should be valid
 CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
 CREATE INDEX brintest_idx ON brintest USING brin (a);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -19,7 +19,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -28,7 +28,7 @@ SELECT brin_index_check('brintest_idx', true);
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -42,7 +42,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -51,7 +51,7 @@ SELECT brin_index_check('brintest_idx', true);
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -65,7 +65,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -74,7 +74,7 @@ SELECT brin_index_check('brintest_idx', true);
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -90,7 +90,7 @@ SELECT box(point(random() * 1000, random() * 1000), point(random() * 1000, rando
 FROM generate_series(1, 10000);
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 2000 AND id < 4000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
  brin_index_check 
 ------------------
  
@@ -99,7 +99,7 @@ SELECT brin_index_check('brintest_idx', true);
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
  brin_index_check 
 ------------------
  
@@ -113,7 +113,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_min
 INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 1500 AND id < 2500;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
@@ -122,12 +122,50 @@ SELECT brin_index_check('brintest_idx', true);
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
  brin_index_check 
 ------------------
  
 (1 row)
 
+-- cleanup
+DROP TABLE brintest;
+-- multiple attributes test with custom operators
+CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a, b) SELECT
+                                random_string((x % 100)),
+                                box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+ brin_index_check 
+------------------
+ 
+(1 row)
+
+-- error if it's impossible to use default operator for all index attributes
+SELECT brin_index_check('brintest_idx', true, true);
+ERROR:  Operator = is not a member of operator family "box_inclusion_ops"
+-- error if number of operators in input doesn't match index attributes number
+SELECT brin_index_check('brintest_idx', true, true, '=');
+ERROR:  Number of operator names in input (1) doesn't match index attributes number (3)
+-- error if operator name is NULL
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL);
+ERROR:  Operator name must not be NULL
+-- error if there is no operator for attribute type
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@');
+ERROR:  There is no operator @@ for type "box"
 -- cleanup
 DROP TABLE brintest;
 -- cleanup
diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql
index 1c97b370cac..66dd1647d3b 100644
--- a/contrib/amcheck/sql/check_brin.sql
+++ b/contrib/amcheck/sql/check_brin.sql
@@ -7,7 +7,7 @@ $$ LANGUAGE sql;
 -- empty table index should be valid
 CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
 CREATE INDEX brintest_idx ON brintest USING brin (a);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 -- cleanup
 DROP TABLE brintest;
 
@@ -17,12 +17,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 -- cleanup
 DROP TABLE brintest;
 
@@ -34,12 +34,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 -- cleanup
 DROP TABLE brintest;
 
@@ -51,12 +51,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_
 INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE a > 20000 AND a < 40000;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 -- cleanup
 DROP TABLE brintest;
 
@@ -70,12 +70,12 @@ FROM generate_series(1, 10000);
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 2000 AND id < 4000;
 
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
 -- cleanup
 DROP TABLE brintest;
 
@@ -86,12 +86,44 @@ CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_min
 INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x;
 -- create some empty ranges
 DELETE FROM brintest WHERE id > 1500 AND id < 2500;
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
 
 -- rebuild index
 DROP INDEX brintest_idx;
 CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
-SELECT brin_index_check('brintest_idx', true);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- multiple attributes test with custom operators
+CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a, b) SELECT
+                                random_string((x % 100)),
+                                box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+
+-- error if it's impossible to use default operator for all index attributes
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- error if number of operators in input doesn't match index attributes number
+SELECT brin_index_check('brintest_idx', true, true, '=');
+
+-- error if operator name is NULL
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL);
+
+-- error if there is no operator for attribute type
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@');
+
 -- cleanup
 DROP TABLE brintest;
 
diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl
index c4073f9bdcc..6fe0c78f12e 100644
--- a/contrib/amcheck/t/007_verify_brin.pl
+++ b/contrib/amcheck/t/007_verify_brin.pl
@@ -210,6 +210,55 @@ my @tests = (
             return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
         },
         expected   => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)")
+    },
+    {
+        # range is marked as empty_range, but heap has some data for the range
+
+        find     => pack('LCC', 0, 0x88, 0x03),
+        replace  => pack('LCC', 0, 0xA8, 0x01),
+        blkno      => 2, # regular page
+        table_data => sub {
+            my ($test_struct) = @_;
+            return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null););
+        },
+        expected   => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)')
+    },
+    {
+        # range hasnulls & allnulls are false, but heap contains null values for the range
+
+        find     => pack('LCC', 0, 0x88, 0x02),
+        replace  => pack('LCC', 0, 0x88, 0x00),
+        blkno      => 2, # regular page
+        table_data => sub {
+            my ($test_struct) = @_;
+            return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+        },
+        expected   => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)')
+    },
+    {
+        # range allnulls is true, but heap contains non-null values for the range
+
+        find     => pack('LCC', 0, 0x88, 0x02),
+        replace  => pack('LCC', 0, 0x88, 0x01),
+        blkno      => 2, # regular page
+        table_data => sub {
+            my ($test_struct) = @_;
+            return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+        },
+        expected   => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)')
+    },
+    {
+        # consistent function return FALSE for the valid heap value
+        # replace "ccccc" with "bbbbb" so that min_max index was too narrow
+
+        find       => 'ccccc',
+        replace    => 'bbbbb',
+        blkno      => 2, # regular page
+        table_data => sub {
+            my ($test_struct) = @_;
+            return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc'););
+        },
+        expected   => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)')
     }
 );
 
@@ -251,7 +300,7 @@ foreach my $test_struct (@tests) {
 $node->start;
 
 foreach my $test_struct (@tests) {
-    my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true)));
+    my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true)));
     like($stderr, $test_struct->{expected});
 }
 
diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c
index 183d189685c..aae7ec3c0d7 100644
--- a/contrib/amcheck/verify_brin.c
+++ b/contrib/amcheck/verify_brin.c
@@ -39,6 +39,8 @@ typedef struct BrinCheckState
 	/* Check arguments */
 
 	bool		regularpagescheck;
+	bool		heapallindexed;
+	ArrayType  *consistent_oper_names;
 
 	/* BRIN check common fields */
 
@@ -67,6 +69,29 @@ typedef struct BrinCheckState
 	Page		regpage;
 	OffsetNumber regpageoffset;
 
+	/* Heap all indexed check fields */
+
+	BrinRevmap *revmap;
+	Buffer		buf;
+	FmgrInfo   *consistentFn;
+	/* Scan keys for regular values */
+	ScanKey    *nonnull_sk;
+	/* Scan keys for null values */
+	ScanKey    *isnull_sk;
+	double		range_cnt;
+	/* first block of the next range */
+	BlockNumber nextrangeBlk;
+
+	/*
+	 * checkable_range shows if current range could be checked and dtup
+	 * contains valid index tuple for the range. It could be false if the
+	 * current range is not summarized, or it's placeholder, or it's just a
+	 * beginning of the check
+	 */
+	bool		checkable_range;
+	BrinMemTuple *dtup;
+	MemoryContext rangeCtx;
+	MemoryContext heaptupleCtx;
 }			BrinCheckState;
 
 static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly);
@@ -87,6 +112,23 @@ static bool revmap_points_to_index_tuple(BrinCheckState * state);
 
 static ItemId PageGetItemIdCareful(BrinCheckState * state);
 
+static void check_heap_all_indexed(BrinCheckState * state);
+
+static void prepare_nonnull_scan_keys(BrinCheckState * state);
+
+static void brin_check_callback(Relation index,
+								ItemPointer tid,
+								Datum *values,
+								bool *isnull,
+								bool tupleIsAlive,
+								void *brstate);
+
+static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid);
+
+static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno, String *opname);
+
+static ScanKey prepare_isnull_scan_key(AttrNumber attno);
+
 static void brin_check_ereport(BrinCheckState * state, const char *fmt);
 
 static void revmap_item_ereport(BrinCheckState * state, const char *fmt);
@@ -95,6 +137,7 @@ static void index_tuple_ereport(BrinCheckState * state, const char *fmt);
 
 static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt);
 
+static void heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message);
 
 Datum
 brin_index_check(PG_FUNCTION_ARGS)
@@ -103,6 +146,8 @@ brin_index_check(PG_FUNCTION_ARGS)
 	BrinCheckState *state = palloc0(sizeof(BrinCheckState));
 
 	state->regularpagescheck = PG_GETARG_BOOL(1);
+	state->heapallindexed = PG_GETARG_BOOL(2);
+	state->consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3);
 
 	amcheck_lock_relation_and_check(indrelid,
 									BRIN_AM_OID,
@@ -127,9 +172,31 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl
 	state->bdesc = brin_build_desc(idxrel);
 	state->natts = state->bdesc->bd_tupdesc->natts;
 
+	/* Do some preparations and checks for heapallindexed */
+	if (state->heapallindexed)
+	{
+		/*
+		 * Check if we are OK with indcheckxmin, and unregister snapshot as we
+		 * don't need it further
+		 */
+		Snapshot	snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+		check_indcheckxmin(state->idxrel, snapshot);
+		UnregisterSnapshot(snapshot);
+
+		/*
+		 * If there are some problems with scan keys generation or operator
+		 * name array is invalid we want to fail fast. So do it here.
+		 */
+		prepare_nonnull_scan_keys(state);
+	}
 
 	check_brin_index_structure(state);
 
+	if (state->heapallindexed)
+	{
+		check_heap_all_indexed(state);
+	}
 
 	brin_free_desc(state->bdesc);
 }
@@ -797,6 +864,424 @@ PageGetItemIdCareful(BrinCheckState * state)
 	return itemid;
 }
 
+/*
+ * Check that every heap tuple are consistent with the index.
+ *
+ * Here we generate ScanKey for every heap tuple and test it against
+ * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_keys')
+ *
+ * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls'
+ * are not too "narrow" for each range, which means:
+ * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true)
+ * 2) all_nulls = true, but we see nonnull value.
+ * 3) empty_range = true, but we see tuple within the range.
+ *
+ * We use allowSync = false, because this way
+ * we process full ranges one by one from the first range.
+ * It's not necessary, but makes the code simpler and this way
+ * we need to fetch every index tuple only once.
+ */
+static void
+check_heap_all_indexed(BrinCheckState * state)
+{
+	Relation	idxrel = state->idxrel;
+	Relation	heaprel = state->heaprel;
+	double		reltuples;
+	IndexInfo  *indexInfo;
+
+	/* heap all indexed check fields initialization */
+
+	state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange);
+	state->dtup = brin_new_memtuple(state->bdesc);
+	state->checkable_range = false;
+	state->consistentFn = palloc0_array(FmgrInfo, state->natts);
+	state->range_cnt = 0;
+	/* next range is the first range in the beginning */
+	state->nextrangeBlk = 0;
+	state->isnull_sk = palloc0_array(ScanKey, state->natts);
+	state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext,
+											"brin check range context",
+											ALLOCSET_DEFAULT_SIZES);
+	state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext,
+												"brin check tuple context",
+												ALLOCSET_DEFAULT_SIZES);
+
+	/*
+	 * Prepare "is_null" scan keys and consistent fn for each attribute.
+	 * "non-null" scan keys are already generated.
+	 */
+	for (AttrNumber attno = 1; attno <= state->natts; attno++)
+	{
+		FmgrInfo   *tmp;
+
+		tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT);
+		fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext);
+
+		state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno);
+	}
+
+	indexInfo = BuildIndexInfo(idxrel);
+
+	/*
+	 * Use snapshot to check only those tuples that are guaranteed to be
+	 * indexed already. Using SnapshotAny would make it more difficult to say
+	 * if there is a corruption or checked tuple just haven't been indexed
+	 * yet. Also, we want to support CIC indexes.
+	 */
+	indexInfo->ii_Concurrent = true;
+	reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true,
+									   brin_check_callback, (void *) state, NULL);
+
+	elog(DEBUG3, "ranges were checked: %f", state->range_cnt);
+	elog(DEBUG3, "scan total tuples: %f", reltuples);
+
+	if (state->buf != InvalidBuffer)
+		ReleaseBuffer(state->buf);
+
+	brinRevmapTerminate(state->revmap);
+	MemoryContextDelete(state->rangeCtx);
+	MemoryContextDelete(state->heaptupleCtx);
+}
+
+/*
+ * Generate scan keys for every index attribute.
+ *
+ * ConsistentFn requires ScanKey, so we need to generate ScanKey for every
+ * attribute somehow. We want ScanKey that would result in TRUE for every heap
+ * tuple within the range when we use its indexed value as sk_argument.
+ * To generate such a ScanKey we need to define the right operand type and the strategy number.
+ * Right operand type is a type of data that index is built on, so it's 'opcintype'.
+ * There is no strategy number that we can always use,
+ * because every opclass defines its own set of operators it supports and strategy number
+ * for the same operator can differ from opclass to opclass.
+ * So to get strategy number we look up an operator that gives us desired behavior
+ * and which both operand types are 'opcintype' and then retrieve the strategy number for it.
+ * Most of the time we can use '='. We let user define operator name in case opclass doesn't
+ * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check.
+ *
+ * If operator name array is empty use "=" operator for every attribute.
+ */
+static void
+prepare_nonnull_scan_keys(BrinCheckState * state)
+{
+	Oid			element_type = ARR_ELEMTYPE(state->consistent_oper_names);
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	Datum	   *values;
+	bool	   *elem_nulls;
+	int			num_elems;
+
+	get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+	deconstruct_array(state->consistent_oper_names, element_type, typlen, typbyval, typalign,
+					  &values, &elem_nulls, &num_elems);
+
+
+	/*
+	 * If we have some input, check that number of operators in the input is
+	 * relevant to the index
+	 */
+	if (num_elems > 0 && num_elems != state->natts)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("Number of operator names in input (%u) "
+						"doesn't match index attributes number (%u)",
+						num_elems, state->natts)));
+	}
+
+
+	/* Generate scan key for every index attribute */
+	state->nonnull_sk = palloc0_array(ScanKey, state->natts);
+
+	for (AttrNumber attno = 1; attno <= state->natts; attno++)
+	{
+		String	   *operatorName;
+
+		if (num_elems > 0)
+		{
+
+			if (elem_nulls[attno - 1])
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Operator name must not be NULL")));
+			}
+
+			operatorName = makeString(TextDatumGetCString(values[attno - 1]));
+		}
+		else
+		{
+
+			/* Use '=' as default operator */
+			operatorName = makeString("=");
+		}
+
+		state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno, operatorName);
+		pfree(operatorName);
+	}
+}
+
+/*
+ * Prepare ScanKey for index attribute.
+ *
+ * Generated once, and will be reused for all heap tuples.
+ * Argument field will be filled for every heap tuple before
+ * consistent function invocation, so leave it NULL for a while.
+ */
+static ScanKey
+prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno, String *opname)
+{
+	ScanKey		scanKey;
+	Oid			opOid;
+	Oid			opFamilyOid;
+	bool		defined;
+	StrategyNumber strategy;
+	RegProcedure opRegProc;
+	List	   *operNameList;
+	int			attindex = attno - 1;
+	Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex);
+	Oid			type = state->idxrel->rd_opcintype[attindex];
+
+	opFamilyOid = state->idxrel->rd_opfamily[attindex];
+	operNameList = list_make1(opname);
+	opOid = OperatorLookup(operNameList, type, type, &defined);
+
+	if (opOid == InvalidOid)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_FUNCTION),
+				 errmsg("There is no operator %s for type \"%s\"",
+						opname->sval, format_type_be(type))));
+	}
+
+	strategy = get_op_opfamily_strategy(opOid, opFamilyOid);
+
+	if (strategy == 0)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("Operator %s is not a member of operator family \"%s\"",
+						opname->sval,
+						get_opfamily_name(opFamilyOid, false))));
+	}
+
+	opRegProc = get_opcode(opOid);
+	scanKey = palloc0(sizeof(ScanKeyData));
+	ScanKeyEntryInitialize(
+						   scanKey,
+						   0,
+						   attno,
+						   strategy,
+						   type,
+						   attr->attcollation,
+						   opRegProc,
+						   (Datum) NULL
+		);
+	pfree(operNameList);
+
+	return scanKey;
+}
+
+static ScanKey
+prepare_isnull_scan_key(AttrNumber attno)
+{
+	ScanKey		scanKey;
+
+	scanKey = palloc0(sizeof(ScanKeyData));
+	ScanKeyEntryInitialize(scanKey,
+						   SK_ISNULL | SK_SEARCHNULL,
+						   attno,
+						   InvalidStrategy,
+						   InvalidOid,
+						   InvalidOid,
+						   InvalidOid,
+						   (Datum) 0);
+	return scanKey;
+}
+
+/*
+ * We walk from the first range (blkno = 0) to the last as the scan proceed.
+ * For every heap tuple we check if we are done with the current range, and we need to move further
+ * to the current heap tuple's range. While moving to the next range we check that it's not empty (because
+ * we have at least one tuple for this range).
+ * Every heap tuple are checked to be consistent with the range it belongs to.
+ * In case of unsummarized ranges and placeholders we skip all checks.
+ *
+ * While moving, we may jump over some ranges,
+ * but it's okay because we would not be able to check them anyway.
+ * We also can't say whether skipped ranges should be marked as empty or not,
+ * since it's possible that there were some tuples before that are now deleted.
+ *
+ */
+static void
+brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate)
+{
+	BrinCheckState *state;
+	BlockNumber heapblk;
+
+	state = (BrinCheckState *) brstate;
+	heapblk = ItemPointerGetBlockNumber(tid);
+
+	/* If we went beyond the current range let's fetch new range */
+	if (heapblk >= state->nextrangeBlk)
+	{
+		BrinTuple  *tup;
+		BrinTuple  *tupcopy = NULL;
+		MemoryContext oldCtx;
+		OffsetNumber off;
+		Size		size;
+		Size		btupsz = 0;
+
+		MemoryContextReset(state->rangeCtx);
+		oldCtx = MemoryContextSwitchTo(state->rangeCtx);
+
+		state->range_cnt++;
+
+		/* Move to the range that contains current heap tuple */
+		tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf,
+									   &off, &size, BUFFER_LOCK_SHARE);
+
+		if (tup)
+		{
+			tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz);
+			LockBuffer(state->buf, BUFFER_LOCK_UNLOCK);
+			state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup);
+
+			/* We can't check placeholder ranges */
+			state->checkable_range = !state->dtup->bt_placeholder;
+		}
+		else
+		{
+			/* We can't check unsummarized ranges. */
+			state->checkable_range = false;
+		}
+
+		/*
+		 * Update nextrangeBlk so we know when we are done with the current
+		 * range
+		 */
+		state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange;
+
+		MemoryContextSwitchTo(oldCtx);
+
+		/* Range must not be empty */
+		if (state->checkable_range && state->dtup->bt_empty_range)
+		{
+			heap_all_indexed_ereport(state, tid, "range is marked as empty but contains qualified live tuples");
+		}
+
+	}
+
+	/* Check tuple is consistent with the index */
+	if (state->checkable_range)
+	{
+		check_heap_tuple(state, values, isnull, tid);
+	}
+
+}
+
+/*
+ * We check hasnulls flags for null values and oi_regular_nulls = true,
+ * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not,
+ * For all other cases we call consistentFn with appropriate scanKey:
+ * - for oi_regular_nulls = false and null values we use 'isNull' scanKey,
+ * - for nonnull values we use 'nonnull' scanKey
+ */
+static void
+check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid)
+{
+	int			attindex;
+	BrinMemTuple *dtup = state->dtup;
+	BrinDesc   *bdesc = state->bdesc;
+	MemoryContext oldCtx;
+
+	Assert(state->checkable_range);
+
+	MemoryContextReset(state->heaptupleCtx);
+	oldCtx = MemoryContextSwitchTo(state->heaptupleCtx);
+
+	/* check every index attribute */
+	for (attindex = 0; attindex < state->natts; attindex++)
+	{
+		BrinValues *bval;
+		Datum		consistentFnResult;
+		bool		consistent;
+		ScanKey		scanKey;
+		bool		oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls;
+
+		bval = &dtup->bt_columns[attindex];
+
+		if (nulls[attindex])
+		{
+			/*
+			 * Use hasnulls flag for oi_regular_nulls is true. Otherwise,
+			 * delegate check to consistentFn
+			 */
+			if (oi_regular_nulls)
+			{
+				/* We have null value, so hasnulls or allnulls must be true */
+				if (!(bval->bv_hasnulls || bval->bv_allnulls))
+				{
+					heap_all_indexed_ereport(state, tid,
+											 "range hasnulls and allnulls are false, but contains a null value");
+				}
+				continue;
+			}
+
+			/*
+			 * In case of null and oi_regular_nulls = false we use isNull
+			 * scanKey for invocation of consistentFn
+			 */
+			scanKey = state->isnull_sk[attindex];
+		}
+		else
+		{
+			/* We have a nonnull value, so allnulls should be false */
+			if (bval->bv_allnulls)
+			{
+				heap_all_indexed_ereport(state, tid, "range allnulls is true, but contains nonnull value");
+			}
+
+			/* use nonnull scan key */
+			scanKey = state->nonnull_sk[attindex];
+			scanKey->sk_argument = values[attindex];
+		}
+
+		/* If oi_regular_nulls = true we should never get there with null */
+		Assert(!oi_regular_nulls || !nulls[attindex]);
+
+		if (state->consistentFn[attindex].fn_nargs >= 4)
+		{
+			consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex],
+												   state->idxrel->rd_indcollation[attindex],
+												   PointerGetDatum(state->bdesc),
+												   PointerGetDatum(bval),
+												   PointerGetDatum(&scanKey),
+												   Int32GetDatum(1)
+				);
+		}
+		else
+		{
+			consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex],
+												   state->idxrel->rd_indcollation[attindex],
+												   PointerGetDatum(state->bdesc),
+												   PointerGetDatum(bval),
+												   PointerGetDatum(scanKey)
+				);
+		}
+
+		consistent = DatumGetBool(consistentFnResult);
+
+		if (!consistent)
+		{
+			heap_all_indexed_ereport(state, tid, "heap tuple inconsistent with index");
+		}
+
+	}
+
+	MemoryContextSwitchTo(oldCtx);
+}
 
 /* Report without any additional info */
 static void
@@ -862,3 +1347,19 @@ revmap_item_ereport(BrinCheckState * state, const char *fmt)
 					state->revmapBlk,
 					state->revmapidx)));
 }
+
+/* Report with range blkno, heap tuple info */
+static void
+heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message)
+{
+	Assert(state->rangeBlkno != InvalidBlockNumber);
+
+	ereport(ERROR,
+			(errcode(ERRCODE_INDEX_CORRUPTED),
+			 errmsg("Index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)",
+					RelationGetRelationName(state->idxrel),
+					message,
+					state->dtup->bt_blkno,
+					ItemPointerGetBlockNumber(tid),
+					ItemPointerGetOffsetNumber(tid))));
+}
diff --git a/doc/src/sgml/amcheck.sgml b/doc/src/sgml/amcheck.sgml
index 2f76af907ec..8ed5fe6ebfd 100644
--- a/doc/src/sgml/amcheck.sgml
+++ b/doc/src/sgml/amcheck.sgml
@@ -233,7 +233,7 @@ SET client_min_messages = DEBUG1;
   <variablelist>
    <varlistentry>
     <term>
-     <function>brin_index_check(index regclass, regularpagescheck boolean) returns void</function>
+     <function>brin_index_check(index regclass, regularpagescheck boolean, heapallindexed boolean, variadic text[]) returns void</function>
      <indexterm>
       <primary>brin_index_check</primary>
      </indexterm>
@@ -261,6 +261,43 @@ SET client_min_messages = DEBUG1;
         </para>
        </listitem>
       </varlistentry>
+      <varlistentry>
+       <term><literal>heapallindexed</literal></term>
+       <listitem>
+        <para>
+         If true, the check verifies that every heap tuple is consistent with the
+         index. This check phase needs an operator for which an expression
+         <literal>LHS OPERATOR RHS</literal> evaluates to <literal>true</literal>
+         when we use the same value of the indexed type for both <literal>LHS</literal> and <literal>RHS</literal>.
+         For example, if the indexed column's type is <type>bigint</type>,
+         equality operator can be used because expression
+         <literal>x = x</literal> result in <literal>true</literal>
+         for every value of <type>bigint</type>
+         (e.g. <literal>1 = 1</literal> is <literal>true</literal>, <literal>2 = 2</literal> is <literal>true</literal>, and so on).
+         Operator also should be part of the operator family of the indexed column.
+         Most of the time, the equality operator can be used.
+         If all indexed column operator classes support equality operator,
+         the function call looks like this:
+<programlisting>
+   SELECT brin_index_check('index_name', true, true);
+</programlisting>
+         If any indexed column operator class doesn't support equality operator then
+         a suitable operator for every such column should be found and
+         operators for all indexed columns should be listed in the function call.
+         For instance, we have two indexed columns
+         (<parameter>a</parameter> <type>int8_minmax_ops</type>, <parameter>b</parameter> <type>box_inclusion_ops</type>).
+         <type>box_inclusion_ops</type> operator class does not support equality operator.
+         The appropriate operator would be <literal>@></literal>.
+         Then the function call looks like this:
+<programlisting>
+   SELECT brin_index_check('index_name', true, true, '=', '@>');
+</programlisting>
+        </para>
+        <para>
+         Defaults to false.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </listitem>
    </varlistentry>
-- 
2.43.0

