public inbox for [email protected]
help / color / mirror / Atom feedFrom: Arseniy Mukhin <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Subject: Re: amcheck support for BRIN indexes
Date: Wed, 22 Apr 2026 11:14:52 +0300
Message-ID: <CAE7r3MKYYfXJtdHoVox5O7A9poY8zZs8oBYFOPA6yV2iF-4aEA@mail.gmail.com> (raw)
In-Reply-To: <CAE7r3MKUOGJ0v5-b5fYaF6sxKZvr0J-YXHTJf8u8GUr1tTcvNg@mail.gmail.com>
References: <[email protected]>
<[email protected]>
<CAE7r3MKUOGJ0v5-b5fYaF6sxKZvr0J-YXHTJf8u8GUr1tTcvNg@mail.gmail.com>
Hi,
On Wed, Aug 13, 2025 at 12:19 AM Arseniy Mukhin
<[email protected]> wrote:
>
> I realized that solving the problem with opclasses without equality
> operator by letting user to define operator list has several
> drawbacks:
>
> It's not very convenient to call automatically? Because the calls are
> different from index to index. You can't just call
> brin_index_check('index', true, true) on everything. Maybe I'm wrong,
> but it seems like amcheck is a tool that is often used to periodically
> check the health of Postgres clusters (and there can be many of them),
> so users probably don't want to get into the details of each index.
>
> Also, it seems like we don't want the user to define the operator to
> check. We want them to pass in the "correct" operator if there is no
> equality operator. So there's no choice, we just want users to figure
> out what the correct operator is and pass it in. But we already know
> what the "correct" operator is. Maybe we should just implement an
> opclass <-> "correct" operator mapping on the database side? We also
> need opclass developers to be able to add such a mapping if they want
> their opclass to be supported by amcheck. Then during the check we can
> look up into the mapping and use the operators. I was thinking about a
> new catalog table or maybe adding it to BrinOpcInfo? Probably there is
> a better way to do it? If the mapping doesn't have an operator for
> opclass - no problem, we can skip the consistentFn call for such
> columns and maybe log a message about it. This way we don't have all
> these problems with operator list argument and with false positives
> when a user fails to realize what the "correct" operator is.
>
Is it something we need to think about? If we want to do something
about it, it looks like adding amcheck StrategyNumber to BrinOpcInfo
is the easiest option.
PFA the rebased version with several tiny fixes.
Best regards,
Arseniy Mukhin
Attachments:
[text/x-patch] v12-0003-amcheck-brin_index_check.patch (72.4K, 2-v12-0003-amcheck-brin_index_check.patch)
download | inline diff:
From 776e3e1c887b139936c5c8497b0a0259b1d25d55 Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <[email protected]>
Date: Mon, 16 Jun 2025 18:11:27 +0300
Subject: [PATCH v12 3/3] amcheck: brin_index_check()
Adds a new function brin_index_check() for validating BRIN indexes.
It incudes next checks:
- meta page checks
- revmap pointers is valid and points to index tuples with expected range blkno
- index tuples have expected format
- some special checks for empty_ranges
- every index tuple has corresponding revmap item that points to it (optional)
- heap all indexed check, where we validate that every heap tuple is consistent with the index (optional)
---
contrib/amcheck/Makefile | 5 +-
contrib/amcheck/amcheck--1.5--1.6.sql | 20 +
contrib/amcheck/amcheck.control | 2 +-
contrib/amcheck/expected/check_brin.out | 172 +++
contrib/amcheck/meson.build | 4 +
contrib/amcheck/sql/check_brin.sql | 132 +++
contrib/amcheck/t/007_verify_brin.pl | 350 ++++++
contrib/amcheck/verify_brin.c | 1359 +++++++++++++++++++++++
doc/src/sgml/amcheck.sgml | 70 ++
src/tools/pgindent/typedefs.list | 1 +
10 files changed, 2112 insertions(+), 3 deletions(-)
create mode 100644 contrib/amcheck/amcheck--1.5--1.6.sql
create mode 100644 contrib/amcheck/expected/check_brin.out
create mode 100644 contrib/amcheck/sql/check_brin.sql
create mode 100644 contrib/amcheck/t/007_verify_brin.pl
create mode 100644 contrib/amcheck/verify_brin.c
diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index 1b7a63cbaa4..bdfb274c89c 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -6,11 +6,12 @@ OBJS = \
verify_common.o \
verify_gin.o \
verify_heapam.o \
- verify_nbtree.o
+ verify_nbtree.o \
+ verify_brin.o
EXTENSION = amcheck
DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql \
- amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql
+ amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql amcheck--1.5--1.6.sql
PGFILEDESC = "amcheck - function for verifying relation integrity"
REGRESS = check check_btree check_gin check_heap
diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
new file mode 100644
index 00000000000..55276527e68
--- /dev/null
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -0,0 +1,20 @@
+/* contrib/amcheck/amcheck--1.5--1.6.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.6'" to load this file. \quit
+
+
+--
+-- brin_index_check()
+--
+CREATE FUNCTION brin_index_check(index regclass,
+ regularpagescheck boolean default false,
+ heapallindexed boolean default false,
+ variadic text[] default '{}'
+)
+ RETURNS VOID
+AS 'MODULE_PATHNAME', 'brin_index_check'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+-- We don't want this to be available to public
+REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC;
\ No newline at end of file
diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control
index c8ba6d7c9bc..2f329ef2cf4 100644
--- a/contrib/amcheck/amcheck.control
+++ b/contrib/amcheck/amcheck.control
@@ -1,5 +1,5 @@
# amcheck extension
comment = 'functions for verifying relation integrity'
-default_version = '1.5'
+default_version = '1.6'
module_pathname = '$libdir/amcheck'
relocatable = true
diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out
new file mode 100644
index 00000000000..a07e8adb72a
--- /dev/null
+++ b/contrib/amcheck/expected/check_brin.out
@@ -0,0 +1,172 @@
+-- helper func
+CREATE OR REPLACE FUNCTION random_string(int) RETURNS text AS $$
+SELECT string_agg(substring('0123456789abcdefghijklmnopqrstuvwxyz', ceil(random() * 36)::integer, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+-- empty table index should be valid
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a);
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- min_max opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multi_min_max opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- bloom opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- inclusion opclass
+CREATE TABLE brintest (id serial PRIMARY KEY, a box);
+CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a)
+SELECT box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_idx', true, true, '@>');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multiple attributes
+CREATE TABLE brintest (id bigserial, a text) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multiple attributes test with custom operators
+CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a, b) SELECT
+ random_string((x % 100)),
+ box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- error if it's impossible to use default operator for all index attributes
+SELECT brin_index_check('brintest_idx', true, true);
+ERROR: operator = is not a member of operator family "box_inclusion_ops"
+-- error if number of operators in input doesn't match index attributes number
+SELECT brin_index_check('brintest_idx', true, true, '=');
+ERROR: number of operator names in input (1) doesn't match index attributes number (3)
+-- error if operator name is NULL
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL);
+ERROR: operator name must not be NULL
+-- error if there is no operator for attribute type
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@');
+ERROR: there is no operator @@ for type "box"
+-- cleanup
+DROP TABLE brintest;
+-- cleanup
+DROP FUNCTION random_string;
diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build
index d5137ef691d..14245e4dd69 100644
--- a/contrib/amcheck/meson.build
+++ b/contrib/amcheck/meson.build
@@ -5,6 +5,7 @@ amcheck_sources = files(
'verify_gin.c',
'verify_heapam.c',
'verify_nbtree.c',
+ 'verify_brin.c'
)
if host_system == 'windows'
@@ -27,6 +28,7 @@ install_data(
'amcheck--1.2--1.3.sql',
'amcheck--1.3--1.4.sql',
'amcheck--1.4--1.5.sql',
+ 'amcheck--1.5--1.6.sql',
kwargs: contrib_data_args,
)
@@ -40,6 +42,7 @@ tests += {
'check_btree',
'check_gin',
'check_heap',
+ 'check_brin'
],
},
'tap': {
@@ -50,6 +53,7 @@ tests += {
't/004_verify_nbtree_unique.pl',
't/005_pitr.pl',
't/006_verify_gin.pl',
+ 't/007_verify_brin.pl',
],
},
}
diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql
new file mode 100644
index 00000000000..66dd1647d3b
--- /dev/null
+++ b/contrib/amcheck/sql/check_brin.sql
@@ -0,0 +1,132 @@
+-- helper func
+CREATE OR REPLACE FUNCTION random_string(int) RETURNS text AS $$
+SELECT string_agg(substring('0123456789abcdefghijklmnopqrstuvwxyz', ceil(random() * 36)::integer, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+
+
+-- empty table index should be valid
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+-- min_max opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- multi_min_max opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- bloom opclass
+CREATE TABLE brintest (a bigint) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- inclusion opclass
+CREATE TABLE brintest (id serial PRIMARY KEY, a box);
+CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a)
+SELECT box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+
+SELECT brin_index_check('brintest_idx', true, true, '@>');
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '@>');
+-- cleanup
+DROP TABLE brintest;
+
+
+-- multiple attributes
+CREATE TABLE brintest (id bigserial, a text) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- multiple attributes test with custom operators
+CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10);
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+INSERT INTO brintest (a, b) SELECT
+ random_string((x % 100)),
+ box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 3000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 1500 AND id < 2500;
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+
+-- rebuild index
+DROP INDEX brintest_idx;
+CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2);
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>');
+
+-- error if it's impossible to use default operator for all index attributes
+SELECT brin_index_check('brintest_idx', true, true);
+
+-- error if number of operators in input doesn't match index attributes number
+SELECT brin_index_check('brintest_idx', true, true, '=');
+
+-- error if operator name is NULL
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL);
+
+-- error if there is no operator for attribute type
+SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@');
+
+-- cleanup
+DROP TABLE brintest;
+
+
+-- cleanup
+DROP FUNCTION random_string;
\ No newline at end of file
diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl
new file mode 100644
index 00000000000..6fe0c78f12e
--- /dev/null
+++ b/contrib/amcheck/t/007_verify_brin.pl
@@ -0,0 +1,350 @@
+# Copyright (c) 2021-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+my $node;
+my $blksize;
+my $meta_page_blkno = 0;
+
+#
+# Test set-up
+#
+$node = PostgreSQL::Test::Cluster->new('test');
+$node->init(no_data_checksums => 1);
+$node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->start;
+$blksize = int($node->safe_psql('postgres', 'SHOW block_size;'));
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+
+# Tests
+
+# Test flow:
+# - create all necessary relations and indexes for all test cases
+# - stop the node
+# - insert corruptions for all test cases
+# - start the node
+# - assertions
+#
+# This way we avoid waiting for the node to restart for each test, which speeds up the tests.
+
+my @tests = (
+ {
+ # invalid meta page type
+
+ find => pack('S', 0xF091),
+ replace => pack('S', 0xAAAA),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted')
+ },
+ {
+ # invalid meta page magic word
+
+ find => pack('L', 0xA8109CFA),
+ replace => pack('L', 0xBB109CFB),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted'),
+ },
+ {
+ # invalid meta page index version
+
+ find => pack('L*', 0xA8109CFA, 1),
+ replace => pack('L*', 0xA8109CFA, 2),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted')
+ },
+ {
+ # pages_per_range below lower limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128),
+ replace => pack('L*', 0xA8109CFA, 1, 0),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted')
+ },
+ {
+ # pages_per_range above upper limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128),
+ replace => pack('L*', 0xA8109CFA, 1, 131073),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted')
+ },
+ {
+ # last_revmap_page below lower limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128, 1),
+ replace => pack('L*', 0xA8109CFA, 1, 128, 0),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted'),
+ },
+ {
+
+ # last_revmap_page beyond index relation size
+
+ find => pack('L*', 0xA8109CFA, 1, 128, 1),
+ replace => pack('L*', 0xA8109CFA, 1, 128, 100),
+ blkno => $meta_page_blkno,
+ expected => wrap('metapage is corrupted'),
+ },
+ {
+ # invalid revmap page type
+
+ find => pack('S', 0xF092),
+ replace => pack('S', 0xAAAA),
+ blkno => 1, # revmap page
+ expected => wrap('revmap page is expected at block 1, last revmap page 1'),
+ },
+ {
+ # revmap item points beyond index relation size
+ # replace (2,1) with (100,1)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 100, 1),
+ blkno => 1, # revmap page
+ expected => wrap('revmap item points to a non existing block 100, '
+ . 'index max block 2. Range blkno: 0, revmap item: (1,0)')
+ },
+ {
+ # invalid regular page type
+
+ find => pack('S', 0xF093),
+ replace => pack('S', 0xAAAA),
+ blkno => 2, # regular page
+ expected => wrap('revmap item points to the page which is not regular (blkno: 2). '
+ . 'Range blkno: 0, revmap item: (1,0)')
+ },
+ {
+ # revmap item points beyond regular page max offset
+ # replace (2,1) with (2,2)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 2, 2),
+ blkno => 1, # revmap page
+ expected => wrap('revmap item offset number 2 is greater than regular page 2 max offset 1. '
+ . 'Range blkno: 0, revmap item: (1,0)')
+ },
+ {
+ # invalid index tuple range blkno
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 1, 0xA8, 0x01),
+ blkno => 2, # regular page
+ expected => wrap('index tuple has invalid blkno 1. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)')
+ },
+ {
+ # range beyond the table size and is not empty
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0x88, 0x01),
+ blkno => 2, # regular page
+ expected => wrap('the range is beyond the table size, but is not marked as empty, table size: 0 blocks. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)')
+ },
+ {
+ # corrupt index tuple data offset
+ # here 0x00, 0x00, 0x00 is padding and '.' is varlena len byte
+
+ find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '(.)' . 'aaaaa',
+ replace => pack('LCCCC', 0, 0x1F, 0x00, 0x00, 0x00) . '$1' . 'aaaaa',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => qr/index tuple header length 31 is greater than tuple len ..\. \QRange blkno: 0, revmap item: (1,0), index tuple: (2,1)\E/
+ },
+ {
+ # empty range index tuple doesn't have null bitmap
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0x28, 0x01),
+ blkno => 2, # regular page
+ expected => wrap('empty range index tuple doesn\'t have null bitmap. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)')
+ },
+ {
+ # empty range index tuple all_nulls -> false
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0xA8, 0x00),
+ blkno => 2, # regular page
+ expected => wrap('empty range index tuple attribute 0 with allnulls is false. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)')
+ },
+ {
+ # empty range index tuple has_nulls -> true
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0xA8, 0x03),
+ blkno => 2, # regular page
+ expected => wrap('empty range index tuple attribute 0 with hasnulls is true. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)')
+ },
+ {
+ # invalid index tuple data
+ # replace varlena len with FF - should work with any endianness
+
+ find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '.' . 'aaaaa',
+ replace => pack('LCCCCC', 0, 0x08, 0x00, 0x00, 0x00, 0xFF) . 'aaaaa',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => qr/attribute 0 stored value 0 with length -1 ends at offset 127 beyond total tuple length ..\.\Q Range blkno: 0, revmap item: (1,0), index tuple: (2,1)\E/
+ },
+ {
+ # orphan index tuple
+ # replace valid revmap item with (0,0)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 0, 0),
+ blkno => 1, # revmap page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)")
+ },
+ {
+ # range is marked as empty_range, but heap has some data for the range
+
+ find => pack('LCC', 0, 0x88, 0x03),
+ replace => pack('LCC', 0, 0xA8, 0x01),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null););
+ },
+ expected => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)')
+ },
+ {
+ # range hasnulls & allnulls are false, but heap contains null values for the range
+
+ find => pack('LCC', 0, 0x88, 0x02),
+ replace => pack('LCC', 0, 0x88, 0x00),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+ },
+ expected => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)')
+ },
+ {
+ # range allnulls is true, but heap contains non-null values for the range
+
+ find => pack('LCC', 0, 0x88, 0x02),
+ replace => pack('LCC', 0, 0x88, 0x01),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+ },
+ expected => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)')
+ },
+ {
+ # consistent function return FALSE for the valid heap value
+ # replace "ccccc" with "bbbbb" so that min_max index was too narrow
+
+ find => 'ccccc',
+ replace => 'bbbbb',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc'););
+ },
+ expected => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)')
+ }
+);
+
+
+# init test data
+my $i = 1;
+foreach my $test_struct (@tests) {
+
+ $test_struct->{table_name} = 't' . $i++;
+ $test_struct->{index_name} = $test_struct->{table_name} . '_brin_idx';
+
+ my $test_data_sql = '';
+ if (exists $test_struct->{table_data}) {
+ $test_data_sql = $test_struct->{table_data}->($test_struct);
+ }
+
+ $node->safe_psql('postgres', qq(
+ CREATE TABLE $test_struct->{table_name} (a TEXT);
+ $test_data_sql
+ CREATE INDEX $test_struct->{index_name} ON $test_struct->{table_name} USING BRIN (a);
+ ));
+
+ $test_struct->{relpath} = relation_filepath($test_struct->{index_name});
+}
+
+# corrupt index
+$node->stop;
+
+foreach my $test_struct (@tests) {
+ string_replace_block(
+ $test_struct->{relpath},
+ $test_struct->{find},
+ $test_struct->{replace},
+ $test_struct->{blkno}
+ );
+}
+
+# assertions
+$node->start;
+
+foreach my $test_struct (@tests) {
+ my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true)));
+ like($stderr, $test_struct->{expected});
+}
+
+
+# Helpers
+
+# Returns the filesystem path for the named relation.
+sub relation_filepath {
+ my ($relname) = @_;
+
+ my $pgdata = $node->data_dir;
+ my $rel = $node->safe_psql('postgres',
+ qq(SELECT pg_relation_filepath('$relname')));
+ die "path not found for relation $relname" unless defined $rel;
+ return "$pgdata/$rel";
+}
+
+sub string_replace_block {
+ my ($filename, $find, $replace, $blkno) = @_;
+
+ my $fh;
+ open($fh, '+<', $filename) or BAIL_OUT("open failed: $!");
+ binmode $fh;
+
+ my $offset = $blkno * $blksize;
+ my $buffer;
+
+ sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!");
+ sysread($fh, $buffer, $blksize) or BAIL_OUT("read failed: $!");
+
+ $buffer =~ s/$find/'"' . $replace . '"'/gee;
+
+ sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!");
+ syswrite($fh, $buffer) or BAIL_OUT("write failed: $!");
+
+ close($fh) or BAIL_OUT("close failed: $!");
+
+ return;
+}
+
+sub wrap
+{
+ my $input = @_;
+ return qr/\Q$input\E/
+}
+
+done_testing();
\ No newline at end of file
diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c
new file mode 100644
index 00000000000..aa026bfb8dc
--- /dev/null
+++ b/contrib/amcheck/verify_brin.c
@@ -0,0 +1,1359 @@
+/*-------------------------------------------------------------------------
+ *
+ * verify_brin.c
+ * Functions to check postgresql brin indexes for corruption
+ *
+ * Copyright (c) 2016-2026, PostgreSQL Global Development Group
+ *
+ * contrib/amcheck/verify_brin.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/brin.h"
+#include "access/brin_page.h"
+#include "access/brin_revmap.h"
+#include "access/htup_details.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/transam.h"
+#include "catalog/index.h"
+#include "catalog/pg_am_d.h"
+#include "catalog/pg_operator.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "verify_common.h"
+
+PG_FUNCTION_INFO_V1(brin_index_check);
+
+typedef struct BrinCheckState
+{
+
+ /* Check arguments */
+
+ bool regularpagescheck;
+ bool heapallindexed;
+ ArrayType *consistent_oper_names;
+
+ /* BRIN check common fields */
+
+ Relation idxrel;
+ Relation heaprel;
+ BrinDesc *bdesc;
+ int natts;
+ BlockNumber pagesPerRange;
+
+ /* Index structure check fields */
+
+ BufferAccessStrategy checkstrategy;
+ BlockNumber idxnblocks;
+ BlockNumber heapnblocks;
+ BlockNumber lastRevmapPage;
+ /* Current range blkno */
+ BlockNumber rangeBlkno;
+ /* Current revmap item */
+ BlockNumber revmapBlk;
+ Buffer revmapbuf;
+ Page revmappage;
+ uint32 revmapidx;
+ /* Current index tuple */
+ BlockNumber regpageBlk;
+ Buffer regpagebuf;
+ Page regpage;
+ OffsetNumber regpageoffset;
+
+ /* Heap all indexed check fields */
+
+ BrinRevmap *revmap;
+ Buffer buf;
+ FmgrInfo *consistentFn;
+ /* Scan keys for regular values */
+ ScanKey *nonnull_sk;
+ /* Scan keys for null values */
+ ScanKey *isnull_sk;
+ double range_cnt;
+ /* first block of the next range */
+ BlockNumber nextrangeBlk;
+
+ /*
+ * checkable_range shows if current range could be checked and dtup
+ * contains valid index tuple for the range. It could be false if the
+ * current range is not summarized, or it's placeholder, or it's just a
+ * beginning of the check
+ */
+ bool checkable_range;
+ BrinMemTuple *dtup;
+ MemoryContext rangeCtx;
+ MemoryContext heaptupleCtx;
+} BrinCheckState;
+
+static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly);
+
+static void check_brin_index_structure(BrinCheckState *state);
+
+static void check_meta(BrinCheckState *state);
+
+static void check_revmap(BrinCheckState *state);
+
+static void check_revmap_item(BrinCheckState *state);
+
+static void check_index_tuple(BrinCheckState *state, BrinTuple *tuple, ItemId lp);
+
+static void check_regular_pages(BrinCheckState *state);
+
+static bool revmap_points_to_index_tuple(BrinCheckState *state);
+
+static ItemId PageGetItemIdCareful(BrinCheckState *state);
+
+static void check_heap_all_indexed(BrinCheckState *state);
+
+static void prepare_nonnull_scan_keys(BrinCheckState *state);
+
+static void brin_check_callback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *brstate);
+
+static void check_heap_tuple(BrinCheckState *state, const Datum *values, const bool *nulls, ItemPointer tid);
+
+static ScanKey prepare_nonnull_scan_key(const BrinCheckState *state, AttrNumber attno, String *opname);
+
+static ScanKey prepare_isnull_scan_key(AttrNumber attno);
+
+static void brin_check_ereport(BrinCheckState *state, const char *fmt);
+
+static void revmap_item_ereport(BrinCheckState *state, const char *fmt);
+
+static void index_tuple_ereport(BrinCheckState *state, const char *fmt);
+
+static void index_tuple_only_ereport(BrinCheckState *state, const char *fmt);
+
+static void heap_all_indexed_ereport(const BrinCheckState *state, const ItemPointerData *tid, const char *message);
+
+Datum
+brin_index_check(PG_FUNCTION_ARGS)
+{
+ Oid indrelid = PG_GETARG_OID(0);
+ BrinCheckState *state = palloc0(sizeof(BrinCheckState));
+
+ state->regularpagescheck = PG_GETARG_BOOL(1);
+ state->heapallindexed = PG_GETARG_BOOL(2);
+ state->consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3);
+
+ amcheck_lock_relation_and_check(indrelid,
+ BRIN_AM_OID,
+ brin_check,
+ ShareUpdateExclusiveLock,
+ state);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Main check function
+ */
+static void
+brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly)
+{
+ BrinCheckState *state = (BrinCheckState *) callback_state;
+
+ /* Initialize check common fields */
+ state->idxrel = idxrel;
+ state->heaprel = heaprel;
+ state->bdesc = brin_build_desc(idxrel);
+ state->natts = state->bdesc->bd_tupdesc->natts;
+
+ /* Do some preparations and checks for heapallindexed */
+ if (state->heapallindexed)
+ {
+ /*
+ * Check if we are OK with indcheckxmin, and unregister snapshot as we
+ * don't need it further
+ */
+ Snapshot snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+ check_indcheckxmin(state->idxrel, snapshot);
+ UnregisterSnapshot(snapshot);
+
+ /*
+ * If there are some problems with scan keys generation or operator
+ * name array is invalid we want to fail fast. So do it here.
+ */
+ prepare_nonnull_scan_keys(state);
+ }
+
+ check_brin_index_structure(state);
+
+ if (state->heapallindexed)
+ {
+ check_heap_all_indexed(state);
+ }
+
+ brin_free_desc(state->bdesc);
+}
+
+/*
+ * Check that index has expected structure
+ *
+ * Some check expectations:
+ * - we hold ShareUpdateExclusiveLock, so revmap could not be extended (i.e. no evacuation) while check as well as
+ * all regular pages should stay regular and ranges could not be summarized and desummarized.
+ * Nevertheless, concurrent updates could lead to new regular page allocations
+ * and moving of index tuples.
+ * - if revmap pointer is valid there should be valid index tuple it points to.
+ * - there are no orphan index tuples (if there is an index tuple, the revmap item points to this tuple also must exist)
+ * - it's possible to encounter placeholder tuples (as a result of crash)
+ * - it's possible to encounter new pages instead of regular (as a result of crash)
+ * - it's possible to encounter pages with evacuation bit (as a result of crash)
+ *
+ */
+static void
+check_brin_index_structure(BrinCheckState *state)
+{
+ /* Index structure check fields initialization */
+ state->checkstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+ check_meta(state);
+
+ /* Check revmap first, blocks: [1, lastRevmapPage] */
+ check_revmap(state);
+
+ if (state->regularpagescheck)
+ {
+ /* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */
+ check_regular_pages(state);
+ }
+
+}
+
+/* Meta page check and save some data for the further check */
+static void
+check_meta(BrinCheckState *state)
+{
+ Buffer metabuf;
+ Page metapage;
+ BrinMetaPageData *metadata;
+
+ /* Meta page check */
+ metabuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, BRIN_METAPAGE_BLKNO, RBM_NORMAL,
+ state->checkstrategy);
+ LockBuffer(metabuf, BUFFER_LOCK_SHARE);
+ metapage = BufferGetPage(metabuf);
+ metadata = (BrinMetaPageData *) PageGetContents(metapage);
+ state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+ if (!BRIN_IS_META_PAGE(metapage) ||
+ metadata->brinMagic != BRIN_META_MAGIC ||
+ metadata->brinVersion != BRIN_CURRENT_VERSION ||
+ metadata->pagesPerRange < 1 || metadata->pagesPerRange > BRIN_MAX_PAGES_PER_RANGE ||
+ metadata->lastRevmapPage <= BRIN_METAPAGE_BLKNO || metadata->lastRevmapPage >= state->idxnblocks)
+ {
+ brin_check_ereport(state, "metapage is corrupted");
+ }
+
+ state->lastRevmapPage = metadata->lastRevmapPage;
+ state->pagesPerRange = metadata->pagesPerRange;
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * This is a main part of the brin index structure check.
+ * We walk revmap page by page from the beginning and check every revmap item and
+ * every index tuple pointed from the revmap.
+ */
+static void
+check_revmap(BrinCheckState *state)
+{
+ Relation idxrel = state->idxrel;
+ BlockNumber lastRevmapPage = state->lastRevmapPage;
+ ReadStream *stream;
+ int stream_flags;
+ ReadStreamBlockNumberCB stream_cb;
+ BlockRangeReadStreamPrivate stream_data;
+
+ state->rangeBlkno = 0;
+ state->regpagebuf = InvalidBuffer;
+ state->heapnblocks = RelationGetNumberOfBlocks(state->heaprel);
+
+ /*
+ * Prepare stream data for revmap walk. It is safe to use batchmode as
+ * block_range_read_stream_cb takes no locks.
+ */
+ stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_USE_BATCHING;
+ /* First revmap page is right after meta page */
+ stream_data.current_blocknum = BRIN_METAPAGE_BLKNO + 1;
+ stream_data.last_exclusive = lastRevmapPage + 1;
+
+ stream_cb = block_range_read_stream_cb;
+ stream = read_stream_begin_relation(stream_flags,
+ GetAccessStrategy(BAS_BULKREAD),
+ idxrel,
+ MAIN_FORKNUM,
+ stream_cb,
+ &stream_data,
+ 0);
+
+ /* Walk each revmap page */
+ while ((state->revmapbuf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ {
+ state->revmapBlk = BufferGetBlockNumber(state->revmapbuf);
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+ state->revmappage = BufferGetPage(state->revmapbuf);
+
+ /*
+ * Pages with block numbers in [1, lastRevmapPage] should be revmap
+ * pages
+ */
+ if (!BRIN_IS_REVMAP_PAGE(state->revmappage))
+ {
+ brin_check_ereport(state, psprintf("revmap page is expected at block %u, last revmap page %u",
+ state->revmapBlk,
+ lastRevmapPage));
+ }
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+ /* Walk and check all brin tuples from the current revmap page */
+ state->revmapidx = 0;
+ while (state->revmapidx < REVMAP_PAGE_MAXITEMS)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check revmap item */
+ check_revmap_item(state);
+
+ state->rangeBlkno += state->pagesPerRange;
+ state->revmapidx++;
+ }
+
+ elog(DEBUG3, "Complete revmap page check: %d", state->revmapBlk);
+
+ ReleaseBuffer(state->revmapbuf);
+ }
+
+ read_stream_end(stream);
+
+ if (BufferIsValid(state->regpagebuf))
+ {
+ ReleaseBuffer(state->regpagebuf);
+ }
+}
+
+/*
+ * Check revmap item.
+ *
+ * We check revmap item pointer itself and if it is ok we check the index tuple it points to.
+ *
+ * To avoid deadlock we need to unlock revmap page before locking regular page,
+ * so when we get the lock on the regular page our index tuple pointer may no longer be relevant.
+ * So for some checks before reporting an error we need to make sure that our pointer is still relevant and if it's not - retry.
+ */
+static void
+check_revmap_item(BrinCheckState *state)
+{
+ ItemPointerData *revmaptids;
+ RevmapContents *contents;
+ ItemPointerData *iptr;
+ ItemId lp;
+ BrinTuple *tup;
+ Relation idxrel = state->idxrel;
+
+ /* Loop to retry revmap item check if there was a concurrent update. */
+ for (;;)
+ {
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+
+ contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+ revmaptids = contents->rm_tids;
+ /* Pointer for the range with start at state->rangeBlkno */
+ iptr = revmaptids + state->revmapidx;
+
+ /* At first check revmap item pointer */
+
+ /*
+ * Tuple pointer is invalid means range isn't summarized, just move
+ * further
+ */
+ if (!ItemPointerIsValid(iptr))
+ {
+ elog(DEBUG3, "Range %u is not summarized", state->rangeBlkno);
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+ break;
+ }
+
+ /*
+ * Pointer is valid, it should points to index tuple for the range
+ * with blkno rangeBlkno. Remember it and unlock revmap page to avoid
+ * deadlock
+ */
+ state->regpageBlk = ItemPointerGetBlockNumber(iptr);
+ state->regpageoffset = ItemPointerGetOffsetNumber(iptr);
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Check if the regpage block number is greater than the relation
+ * size. To avoid fetching the number of blocks for each tuple, use
+ * cached value first
+ */
+ if (state->regpageBlk >= state->idxnblocks)
+ {
+ /*
+ * Regular pages may have been added, so refresh idxnblocks and
+ * recheck
+ */
+ state->idxnblocks = RelationGetNumberOfBlocks(idxrel);
+ if (state->regpageBlk >= state->idxnblocks)
+ {
+ revmap_item_ereport(state,
+ psprintf("revmap item points to a non existing block %u, index max block %u",
+ state->regpageBlk,
+ state->idxnblocks - 1));
+ }
+ }
+
+ /*
+ * To avoid some pin/unpin cycles we cache last used regular page.
+ * Check if we need different regular page and fetch it.
+ */
+ if (!BufferIsValid(state->regpagebuf) || BufferGetBlockNumber(state->regpagebuf) != state->regpageBlk)
+ {
+ if (BufferIsValid(state->regpagebuf))
+ {
+ ReleaseBuffer(state->regpagebuf);
+ }
+ state->regpagebuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL,
+ state->checkstrategy);
+ }
+
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+ state->regpage = BufferGetPage(state->regpagebuf);
+
+ /* Revmap should always point to a regular page */
+ if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+ {
+ revmap_item_ereport(state,
+ psprintf("revmap item points to the page which is not regular (blkno: %u)",
+ state->regpageBlk));
+
+ }
+
+ /* Check item offset is valid */
+ if (state->regpageoffset > PageGetMaxOffsetNumber(state->regpage))
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ revmap_item_ereport(state,
+ psprintf("revmap item offset number %u is greater than regular page %u max offset %u",
+ state->regpageoffset,
+ state->regpageBlk,
+ PageGetMaxOffsetNumber(state->regpage)));
+ }
+
+ elog(DEBUG3, "Process range: %u, iptr: (%u,%u)", state->rangeBlkno, state->regpageBlk, state->regpageoffset);
+
+ /*
+ * Revmap pointer is OK. It points to existing regular page, offset
+ * also is ok. Let's check index tuple it points to.
+ */
+
+ lp = PageGetItemIdCareful(state);
+
+ /* Revmap should point to NORMAL tuples only */
+ if (!ItemIdIsUsed(lp))
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ index_tuple_ereport(state, "revmap item points to unused index tuple");
+ }
+
+ tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+ /* Check if range block number is as expected */
+ if (tup->bt_blkno != state->rangeBlkno)
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ index_tuple_ereport(state, psprintf("index tuple has invalid blkno %u", tup->bt_blkno));
+ }
+
+ /*
+ * If the range is beyond the table size - the range must be empty.
+ * It's valid situation for empty table now.
+ */
+ if (state->rangeBlkno >= state->heapnblocks)
+ {
+ if (!BrinTupleIsEmptyRange(tup))
+ {
+ index_tuple_ereport(state,
+ psprintf("the range is beyond the table size, "
+ "but is not marked as empty, table size: %u blocks",
+ state->heapnblocks));
+ }
+ }
+
+ /* Check index tuple itself */
+ check_index_tuple(state, tup, lp);
+
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ break;
+ }
+}
+
+/*
+ * Check that index tuple has expected structure.
+ *
+ * This function follows the logic performed by brin_deform_tuple().
+ * After this check is complete we are sure that brin_deform_tuple can process it.
+ *
+ * In case of empty range check that for all attributes allnulls are true, hasnulls are false and
+ * there is no data. All core opclasses expect allnulls is true for empty range.
+ */
+static void
+check_index_tuple(BrinCheckState *state, BrinTuple *tuple, ItemId lp)
+{
+
+ char *tp; /* tuple data */
+ uint16 off;
+ uint8 *nullbits;
+ TupleDesc disktdesc;
+ int stored;
+ bool empty_range = BrinTupleIsEmptyRange(tuple);
+ bool hasnullbitmap = BrinTupleHasNulls(tuple);
+ uint8 hoff = BrinTupleDataOffset(tuple);
+ uint16 tuplen = ItemIdGetLength(lp);
+
+ /* Check that header length is not greater than tuple length */
+ if (hoff > tuplen)
+ {
+ index_tuple_ereport(state, psprintf("index tuple header length %u is greater than tuple len %u", hoff, tuplen));
+ }
+
+ /* If tuple has null bitmap - initialize it */
+ if (hasnullbitmap)
+ {
+ nullbits = (uint8 *) ((char *) tuple + SizeOfBrinTuple);
+ }
+ else
+ {
+ nullbits = NULL;
+ }
+
+ /* Empty range index tuple checks */
+ if (empty_range)
+ {
+ /* Empty range tuple should have null bitmap */
+ if (!hasnullbitmap)
+ {
+ index_tuple_ereport(state, "empty range index tuple doesn't have null bitmap");
+ }
+
+ Assert(nullbits != NULL);
+
+ /* Check every attribute has allnulls is true and hasnulls is false */
+ for (int attindex = 0; attindex < state->natts; ++attindex)
+ {
+
+ /* Attribute allnulls should be true for empty range */
+ if (att_isnull(attindex, nullbits))
+ {
+ index_tuple_ereport(state,
+ psprintf("empty range index tuple attribute %d with allnulls is false",
+ attindex));
+ }
+
+ /* Attribute hasnulls should be false for empty range */
+ if (!att_isnull(state->natts + attindex, nullbits))
+ {
+ index_tuple_ereport(state,
+ psprintf("empty range index tuple attribute %d with hasnulls is true",
+ attindex));
+ }
+ }
+
+ /* We are done with empty range tuple */
+ return;
+ }
+
+ /*
+ * Range is marked as not empty so we can have some data in the tuple.
+ * Walk all attributes and checks that all stored values fit into the
+ * tuple
+ */
+
+ tp = (char *) tuple + BrinTupleDataOffset(tuple);
+ stored = 0;
+ off = 0;
+
+ disktdesc = brin_tuple_tupdesc(state->bdesc);
+
+ for (int attindex = 0; attindex < state->natts; ++attindex)
+ {
+ BrinOpcInfo *opclass = state->bdesc->bd_info[attindex];
+
+ /*
+ * if allnulls is set we have no data for this attribute, move to the
+ * next
+ */
+ if (hasnullbitmap && !att_isnull(attindex, nullbits))
+ {
+ stored += opclass->oi_nstored;
+ continue;
+ }
+
+ /* Walk all stored values for the current attribute */
+ for (int datumno = 0; datumno < opclass->oi_nstored; datumno++)
+ {
+ CompactAttribute *thisatt = TupleDescCompactAttr(disktdesc, stored);
+
+ if (thisatt->attlen == -1)
+ {
+ off = att_pointer_alignby(off,
+ thisatt->attalignby,
+ -1,
+ tp + off);
+ }
+ else
+ {
+ off = att_nominal_alignby(off, thisatt->attalignby);
+ }
+
+ /* Check that we are still in the tuple */
+ if (hoff + off > tuplen)
+ {
+ index_tuple_ereport(state,
+ psprintf("attribute %u stored value %u with length %d "
+ "starts at offset %u beyond total tuple length %u",
+ attindex, datumno, thisatt->attlen, off, tuplen));
+ }
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+ /* Check that we are still in the tuple */
+ if (hoff + off > tuplen)
+ {
+ index_tuple_ereport(state,
+ psprintf("attribute %u stored value %u with length %d "
+ "ends at offset %u beyond total tuple length %u",
+ attindex, datumno, thisatt->attlen, off, tuplen));
+ }
+ stored++;
+ }
+
+ }
+
+}
+
+/*
+ * At the moment we should have been already check that every index
+ * tuple in the regular pages has valid structure and range blkno
+ * (because every normal index tuple must have pointer in the revmap and
+ * we followed every such pointer in check_revmap). So here we just want
+ * to do some additional checks to be sure that there is nothing wrong
+ * with the regular pages [lastRevmapPage + 1, indexnblocks]:
+ * - there is a pointer in revmap to each NORMAL index tuple
+ * (no orphans index tuples)
+ * - all pages have expected type (REGULAR). We can encounter new pages as
+ * result of crash, so we just skip such pages.
+ */
+static void
+check_regular_pages(BrinCheckState *state)
+{
+ ReadStream *stream;
+ int stream_flags;
+ ReadStreamBlockNumberCB stream_cb;
+ BlockRangeReadStreamPrivate stream_data;
+
+ /* reset state */
+ state->revmapBlk = InvalidBlockNumber;
+ state->revmapbuf = InvalidBuffer;
+ state->revmapidx = -1;
+ state->regpageBlk = InvalidBlockNumber;
+ state->regpagebuf = InvalidBuffer;
+ state->regpageoffset = InvalidOffsetNumber;
+ state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+ /*
+ * Prepare stream data for regular pages walk. It is safe to use batchmode
+ * as block_range_read_stream_cb takes no locks.
+ */
+ stream_flags = READ_STREAM_SEQUENTIAL | READ_STREAM_USE_BATCHING | READ_STREAM_FULL;
+ /* First regular page is right after the last revmap page */
+ stream_data.current_blocknum = state->lastRevmapPage + 1;
+ stream_data.last_exclusive = state->idxnblocks;
+
+ stream_cb = block_range_read_stream_cb;
+ stream = read_stream_begin_relation(stream_flags,
+ GetAccessStrategy(BAS_BULKREAD),
+ state->idxrel,
+ MAIN_FORKNUM,
+ stream_cb,
+ &stream_data,
+ 0);
+
+ while ((state->regpagebuf = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
+ {
+ OffsetNumber maxoff;
+
+ state->regpageBlk = BufferGetBlockNumber(state->regpagebuf);
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+ state->regpage = BufferGetPage(state->regpagebuf);
+
+ /* Skip new pages */
+ if (PageIsNew(state->regpage))
+ {
+ UnlockReleaseBuffer(state->regpagebuf);
+ continue;
+ }
+
+ if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+ {
+ brin_check_ereport(state, psprintf("expected new or regular page at block %u", state->regpageBlk));
+ }
+
+ /* Check that all NORMAL index tuples within the page are not orphans */
+ maxoff = PageGetMaxOffsetNumber(state->regpage);
+ for (state->regpageoffset = FirstOffsetNumber; state->regpageoffset <= maxoff; state->regpageoffset++)
+ {
+ ItemId lp;
+
+ CHECK_FOR_INTERRUPTS();
+
+ lp = PageGetItemIdCareful(state);
+
+ if (ItemIdIsUsed(lp))
+ {
+ BrinTuple *tup;
+ BlockNumber revmapBlk;
+
+ tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+ /* Get revmap block number for index tuple blkno */
+ revmapBlk = ((tup->bt_blkno / state->pagesPerRange) / REVMAP_PAGE_MAXITEMS) + 1;
+ if (revmapBlk > state->lastRevmapPage)
+ {
+ index_tuple_only_ereport(state, psprintf("no revmap page for the index tuple with blkno %u",
+ tup->bt_blkno));
+ }
+
+ /* Fetch another revmap page if needed */
+ if (state->revmapBlk != revmapBlk)
+ {
+ if (BlockNumberIsValid(state->revmapBlk))
+ {
+ ReleaseBuffer(state->revmapbuf);
+ }
+ state->revmapBlk = revmapBlk;
+ state->revmapbuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL,
+ state->checkstrategy);
+ }
+
+ state->revmapidx = (tup->bt_blkno / state->pagesPerRange) % REVMAP_PAGE_MAXITEMS;
+ state->rangeBlkno = tup->bt_blkno;
+
+ /* check that revmap item points to index tuple */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ index_tuple_ereport(state, psprintf("revmap doesn't point to index tuple"));
+ }
+
+ }
+ }
+
+ UnlockReleaseBuffer(state->regpagebuf);
+ }
+
+ read_stream_end(stream);
+
+ if (state->revmapbuf != InvalidBuffer)
+ {
+ ReleaseBuffer(state->revmapbuf);
+ }
+}
+
+/*
+ * Check if the revmap item points to the index tuple (regpageBlk, regpageoffset).
+ * We have locked reg page, and lock revmap page here.
+ * It's a valid lock ordering, so no deadlock is possible.
+ */
+static bool
+revmap_points_to_index_tuple(BrinCheckState *state)
+{
+ ItemPointerData *revmaptids;
+ RevmapContents *contents;
+ ItemPointerData *tid;
+ bool points;
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+ contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+ revmaptids = contents->rm_tids;
+ tid = revmaptids + state->revmapidx;
+
+ points = ItemPointerGetBlockNumberNoCheck(tid) == state->regpageBlk &&
+ ItemPointerGetOffsetNumberNoCheck(tid) == state->regpageoffset;
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+ return points;
+}
+
+/*
+ * PageGetItemId() wrapper that validates returned line pointer.
+ *
+ * itemId in brin index could be UNUSED or NORMAL.
+ */
+static ItemId
+PageGetItemIdCareful(BrinCheckState *state)
+{
+ Page page = state->regpage;
+ OffsetNumber offset = state->regpageoffset;
+ ItemId itemid = PageGetItemId(page, offset);
+
+ if (ItemIdGetOffset(itemid) + ItemIdGetLength(itemid) >
+ BLCKSZ - MAXALIGN(sizeof(BrinSpecialSpace)))
+ index_tuple_ereport(state,
+ psprintf("line pointer points past end of tuple space in index. "
+ "lp_off=%u, lp_len=%u lp_flags=%u",
+ ItemIdGetOffset(itemid),
+ ItemIdGetLength(itemid),
+ ItemIdGetFlags(itemid)
+ )
+ );
+
+ /* Verify that line pointer is LP_NORMAL or LP_UNUSED */
+ if (!((ItemIdIsNormal(itemid) && ItemIdHasStorage(itemid)) ||
+ (!ItemIdIsUsed(itemid) && !ItemIdHasStorage(itemid))))
+ {
+ index_tuple_ereport(state,
+ psprintf("invalid line pointer storage in index. "
+ "lp_off=%u, lp_len=%u lp_flags=%u",
+ ItemIdGetOffset(itemid),
+ ItemIdGetLength(itemid),
+ ItemIdGetFlags(itemid)
+ ));
+ }
+
+ return itemid;
+}
+
+/*
+ * Check that every heap tuple are consistent with the index.
+ *
+ * Here we generate ScanKey for every heap tuple and test it against
+ * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_keys')
+ *
+ * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls'
+ * are not too "narrow" for each range, which means:
+ * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true)
+ * 2) all_nulls = true, but we see nonnull value.
+ * 3) empty_range = true, but we see tuple within the range.
+ *
+ * We use allowSync = false, because this way
+ * we process full ranges one by one from the first range.
+ * It's not necessary, but makes the code simpler and this way
+ * we need to fetch every index tuple only once.
+ */
+static void
+check_heap_all_indexed(BrinCheckState *state)
+{
+ Relation idxrel = state->idxrel;
+ Relation heaprel = state->heaprel;
+ double reltuples;
+ IndexInfo *indexInfo;
+
+ /* heap all indexed check fields initialization */
+
+ state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange);
+ state->dtup = brin_new_memtuple(state->bdesc);
+ state->checkable_range = false;
+ state->consistentFn = palloc0_array(FmgrInfo, state->natts);
+ state->range_cnt = 0;
+ /* next range is the first range in the beginning */
+ state->nextrangeBlk = 0;
+ state->isnull_sk = palloc0_array(ScanKey, state->natts);
+ state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "brin check range context",
+ ALLOCSET_DEFAULT_SIZES);
+ state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "brin check tuple context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Prepare "is_null" scan keys and consistent fn for each attribute.
+ * "non-null" scan keys are already generated.
+ */
+ for (AttrNumber attno = 1; attno <= state->natts; attno++)
+ {
+ FmgrInfo *tmp;
+
+ tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT);
+ fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext);
+
+ state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno);
+ }
+
+ indexInfo = BuildIndexInfo(idxrel);
+
+ /*
+ * Use snapshot to check only those tuples that are guaranteed to be
+ * indexed already. Using SnapshotAny would make it more difficult to say
+ * if there is a corruption or checked tuple just haven't been indexed
+ * yet. Also, we want to support CIC indexes.
+ */
+ indexInfo->ii_Concurrent = true;
+ reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true,
+ brin_check_callback, (void *) state, NULL);
+
+ elog(DEBUG3, "ranges were checked: %f", state->range_cnt);
+ elog(DEBUG3, "scan total tuples: %f", reltuples);
+
+ if (state->buf != InvalidBuffer)
+ ReleaseBuffer(state->buf);
+
+ brinRevmapTerminate(state->revmap);
+ MemoryContextDelete(state->rangeCtx);
+ MemoryContextDelete(state->heaptupleCtx);
+}
+
+/*
+ * Generate scan keys for every index attribute.
+ *
+ * ConsistentFn requires ScanKey, so we need to generate ScanKey for every
+ * attribute somehow. We want ScanKey that would result in TRUE for every heap
+ * tuple within the range when we use its indexed value as sk_argument.
+ * To generate such a ScanKey we need to define the right operand type and the strategy number.
+ * Right operand type is a type of data that index is built on, so it's 'opcintype'.
+ * There is no strategy number that we can always use,
+ * because every opclass defines its own set of operators it supports and strategy number
+ * for the same operator can differ from opclass to opclass.
+ * So to get strategy number we look up an operator that gives us desired behavior
+ * and which both operand types are 'opcintype' and then retrieve the strategy number for it.
+ * Most of the time we can use '='. We let user define operator name in case opclass doesn't
+ * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check.
+ *
+ * If operator name array is empty use "=" operator for every attribute.
+ */
+static void
+prepare_nonnull_scan_keys(BrinCheckState *state)
+{
+ Oid element_type = ARR_ELEMTYPE(state->consistent_oper_names);
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ Datum *values;
+ bool *elem_nulls;
+ int num_elems;
+
+ get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+ deconstruct_array(state->consistent_oper_names, element_type, typlen, typbyval, typalign,
+ &values, &elem_nulls, &num_elems);
+
+ /*
+ * If we have some input, check that number of operators in the input is
+ * relevant to the index
+ */
+ if (num_elems > 0 && num_elems != state->natts)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("number of operator names in input (%u) "
+ "doesn't match index attributes number (%u)",
+ num_elems, state->natts)));
+ }
+
+ /* Generate scan key for every index attribute */
+ state->nonnull_sk = palloc0_array(ScanKey, state->natts);
+
+ for (AttrNumber attno = 1; attno <= state->natts; attno++)
+ {
+ String *operatorName;
+
+ if (num_elems > 0)
+ {
+
+ if (elem_nulls[attno - 1])
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("operator name must not be NULL")));
+ }
+
+ operatorName = makeString(TextDatumGetCString(values[attno - 1]));
+ }
+ else
+ {
+
+ /* Use '=' as default operator */
+ operatorName = makeString("=");
+ }
+
+ state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno, operatorName);
+ pfree(operatorName);
+ }
+}
+
+/*
+ * Prepare ScanKey for index attribute.
+ *
+ * Generated once, and will be reused for all heap tuples.
+ * Argument field will be filled for every heap tuple before
+ * consistent function invocation, so leave it NULL for a while.
+ */
+static ScanKey
+prepare_nonnull_scan_key(const BrinCheckState *state, AttrNumber attno, String *opname)
+{
+ ScanKey scanKey;
+ Oid opOid;
+ Oid opFamilyOid;
+ bool defined;
+ StrategyNumber strategy;
+ RegProcedure opRegProc;
+ List *operNameList;
+ int attindex = attno - 1;
+ Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex);
+ Oid type = state->idxrel->rd_opcintype[attindex];
+
+ opFamilyOid = state->idxrel->rd_opfamily[attindex];
+ operNameList = list_make1(opname);
+ opOid = OperatorLookup(operNameList, type, type, &defined);
+
+ if (opOid == InvalidOid)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("there is no operator %s for type \"%s\"",
+ opname->sval, format_type_be(type))));
+ }
+
+ strategy = get_op_opfamily_strategy(opOid, opFamilyOid);
+
+ if (strategy == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("operator %s is not a member of operator family \"%s\"",
+ opname->sval,
+ get_opfamily_name(opFamilyOid, false))));
+ }
+
+ opRegProc = get_opcode(opOid);
+ scanKey = palloc0(sizeof(ScanKeyData));
+ ScanKeyEntryInitialize(
+ scanKey,
+ 0,
+ attno,
+ strategy,
+ type,
+ attr->attcollation,
+ opRegProc,
+ (Datum) NULL
+ );
+ pfree(operNameList);
+
+ return scanKey;
+}
+
+static ScanKey
+prepare_isnull_scan_key(AttrNumber attno)
+{
+ ScanKey scanKey;
+
+ scanKey = palloc0(sizeof(ScanKeyData));
+ ScanKeyEntryInitialize(scanKey,
+ SK_ISNULL | SK_SEARCHNULL,
+ attno,
+ InvalidStrategy,
+ InvalidOid,
+ InvalidOid,
+ InvalidOid,
+ (Datum) 0);
+ return scanKey;
+}
+
+/*
+ * We walk from the first range (blkno = 0) to the last as the scan proceed.
+ * For every heap tuple we check if we are done with the current range, and we need to move further
+ * to the current heap tuple's range. While moving to the next range we check that it's not empty (because
+ * we have at least one tuple for this range).
+ * Every heap tuple are checked to be consistent with the range it belongs to.
+ * In case of unsummarized ranges and placeholders we skip all checks.
+ *
+ * While moving, we may jump over some ranges,
+ * but it's okay because we would not be able to check them anyway.
+ * We also can't say whether skipped ranges should be marked as empty or not,
+ * since it's possible that there were some tuples before that are now deleted.
+ *
+ */
+static void
+brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate)
+{
+ BrinCheckState *state;
+ BlockNumber heapblk;
+
+ state = (BrinCheckState *) brstate;
+ heapblk = ItemPointerGetBlockNumber(tid);
+
+ /* If we went beyond the current range let's fetch new range */
+ if (heapblk >= state->nextrangeBlk)
+ {
+ BrinTuple *tup;
+ BrinTuple *tupcopy = NULL;
+ MemoryContext oldCtx;
+ OffsetNumber off;
+ Size size;
+ Size btupsz = 0;
+
+ MemoryContextReset(state->rangeCtx);
+ oldCtx = MemoryContextSwitchTo(state->rangeCtx);
+
+ state->range_cnt++;
+
+ /* Move to the range that contains current heap tuple */
+ tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf,
+ &off, &size, BUFFER_LOCK_SHARE);
+
+ if (tup)
+ {
+ tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz);
+ LockBuffer(state->buf, BUFFER_LOCK_UNLOCK);
+ state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup);
+
+ /* We can't check placeholder ranges */
+ state->checkable_range = !state->dtup->bt_placeholder;
+ }
+ else
+ {
+ /* We can't check unsummarized ranges. */
+ state->checkable_range = false;
+ }
+
+ /*
+ * Update nextrangeBlk so we know when we are done with the current
+ * range
+ */
+ state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange;
+
+ MemoryContextSwitchTo(oldCtx);
+
+ /* Range must not be empty */
+ if (state->checkable_range && state->dtup->bt_empty_range)
+ {
+ heap_all_indexed_ereport(state, tid, "range is marked as empty but contains qualified live tuples");
+ }
+
+ }
+
+ /* Check tuple is consistent with the index */
+ if (state->checkable_range)
+ {
+ check_heap_tuple(state, values, isnull, tid);
+ }
+
+}
+
+/*
+ * We check hasnulls flags for null values and oi_regular_nulls = true,
+ * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not,
+ * For all other cases we call consistentFn with appropriate scanKey:
+ * - for oi_regular_nulls = false and null values we use 'isNull' scanKey,
+ * - for nonnull values we use 'nonnull' scanKey
+ */
+static void
+check_heap_tuple(BrinCheckState *state, const Datum *values, const bool *nulls, ItemPointer tid)
+{
+ int attindex;
+ BrinMemTuple *dtup = state->dtup;
+ BrinDesc *bdesc = state->bdesc;
+ MemoryContext oldCtx;
+
+ Assert(state->checkable_range);
+
+ MemoryContextReset(state->heaptupleCtx);
+ oldCtx = MemoryContextSwitchTo(state->heaptupleCtx);
+
+ /* check every index attribute */
+ for (attindex = 0; attindex < state->natts; attindex++)
+ {
+ BrinValues *bval;
+ Datum consistentFnResult;
+ bool consistent;
+ ScanKey scanKey;
+ bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls;
+
+ bval = &dtup->bt_columns[attindex];
+
+ if (nulls[attindex])
+ {
+ /*
+ * Use hasnulls flag for oi_regular_nulls is true. Otherwise,
+ * delegate check to consistentFn
+ */
+ if (oi_regular_nulls)
+ {
+ /* We have null value, so hasnulls or allnulls must be true */
+ if (!(bval->bv_hasnulls || bval->bv_allnulls))
+ {
+ heap_all_indexed_ereport(state, tid,
+ "range hasnulls and allnulls are false, but contains a null value");
+ }
+ continue;
+ }
+
+ /*
+ * In case of null and oi_regular_nulls = false we use isNull
+ * scanKey for invocation of consistentFn
+ */
+ scanKey = state->isnull_sk[attindex];
+ }
+ else
+ {
+ /* We have a nonnull value, so allnulls should be false */
+ if (bval->bv_allnulls)
+ {
+ heap_all_indexed_ereport(state, tid, "range allnulls is true, but contains nonnull value");
+ }
+
+ /* use nonnull scan key */
+ scanKey = state->nonnull_sk[attindex];
+ scanKey->sk_argument = values[attindex];
+ }
+
+ /* If oi_regular_nulls = true we should never get there with null */
+ Assert(!oi_regular_nulls || !nulls[attindex]);
+
+ if (state->consistentFn[attindex].fn_nargs >= 4)
+ {
+ consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex],
+ state->idxrel->rd_indcollation[attindex],
+ PointerGetDatum(state->bdesc),
+ PointerGetDatum(bval),
+ PointerGetDatum(&scanKey),
+ Int32GetDatum(1)
+ );
+ }
+ else
+ {
+ consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex],
+ state->idxrel->rd_indcollation[attindex],
+ PointerGetDatum(state->bdesc),
+ PointerGetDatum(bval),
+ PointerGetDatum(scanKey)
+ );
+ }
+
+ consistent = DatumGetBool(consistentFnResult);
+
+ if (!consistent)
+ {
+ heap_all_indexed_ereport(state, tid, "heap tuple inconsistent with index");
+ }
+
+ }
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
+/* Report without any additional info */
+static void
+brin_check_ereport(BrinCheckState *state, const char *fmt)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index %s is corrupted - %s", RelationGetRelationName(state->idxrel), fmt)));
+}
+
+/* Report with range blkno, revmap item info, index tuple info */
+static void
+index_tuple_ereport(BrinCheckState *state, const char *fmt)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+ Assert(state->revmapBlk != InvalidBlockNumber);
+ Assert(state->revmapidx < REVMAP_PAGE_MAXITEMS);
+ Assert(state->regpageBlk != InvalidBlockNumber);
+ Assert(state->regpageoffset != InvalidOffsetNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u), index tuple: (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->rangeBlkno,
+ state->revmapBlk,
+ state->revmapidx,
+ state->regpageBlk,
+ state->regpageoffset)));
+}
+
+/* Report with index tuple info */
+static void
+index_tuple_only_ereport(BrinCheckState *state, const char *fmt)
+{
+ Assert(state->regpageBlk != InvalidBlockNumber);
+ Assert(state->regpageoffset != InvalidOffsetNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index %s is corrupted - %s. Index tuple: (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->regpageBlk,
+ state->regpageoffset)));
+}
+
+/* Report with range blkno, revmap item info */
+static void
+revmap_item_ereport(BrinCheckState *state, const char *fmt)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+ Assert(state->revmapBlk != InvalidBlockNumber);
+ Assert(state->revmapidx < REVMAP_PAGE_MAXITEMS);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u).",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->rangeBlkno,
+ state->revmapBlk,
+ state->revmapidx)));
+}
+
+/* Report with range blkno, heap tuple info */
+static void
+heap_all_indexed_ereport(const BrinCheckState *state, const ItemPointerData *tid, const char *message)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INDEX_CORRUPTED),
+ errmsg("index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ message,
+ state->dtup->bt_blkno,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid))));
+}
diff --git a/doc/src/sgml/amcheck.sgml b/doc/src/sgml/amcheck.sgml
index 08006856579..3c3fe6d3d32 100644
--- a/doc/src/sgml/amcheck.sgml
+++ b/doc/src/sgml/amcheck.sgml
@@ -231,6 +231,76 @@ SET client_min_messages = DEBUG1;
</tip>
<variablelist>
+ <varlistentry>
+ <term>
+ <function>brin_index_check(index regclass, regularpagescheck boolean, heapallindexed boolean, variadic text[]) returns void</function>
+ <indexterm>
+ <primary>brin_index_check</primary>
+ </indexterm>
+ </term>
+
+ <listitem>
+ <para>
+ <function>brin_index_check</function> tests that its target BRIN index
+ has no structural corruptions. A <literal>ShareUpdateExclusiveLock</literal>
+ is required on the target index by <function>brin_index_check</function>.
+ </para>
+ <para>
+ The following optional arguments are recognized:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><literal>regularpagescheck</literal></term>
+ <listitem>
+ <para>
+ If true, check does another run over all regular pages and tries to
+ find some corruptions that was not possible to find during the basic check.
+ </para>
+ <para>
+ Defaults to false.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><literal>heapallindexed</literal></term>
+ <listitem>
+ <para>
+ If true, the check verifies that every heap tuple is consistent with the
+ index. This check phase needs an operator for which an expression
+ <literal>LHS OPERATOR RHS</literal> evaluates to <literal>true</literal>
+ when we use the same value of the indexed type for both <literal>LHS</literal> and <literal>RHS</literal>.
+ For example, if the indexed column's type is <type>bigint</type>,
+ equality operator can be used because expression
+ <literal>x = x</literal> result in <literal>true</literal>
+ for every value of <type>bigint</type>
+ (e.g. <literal>1 = 1</literal> is <literal>true</literal>, <literal>2 = 2</literal> is <literal>true</literal>, and so on).
+ Operator also should be part of the operator family of the indexed column.
+ Most of the time, the equality operator can be used.
+ If all indexed column operator classes support equality operator,
+ the function call looks like this:
+<programlisting>
+ SELECT brin_index_check('index_name', true, true);
+</programlisting>
+ If any indexed column operator class doesn't support equality operator then
+ a suitable operator for every such column should be found and
+ operators for all indexed columns should be listed in the function call.
+ For instance, we have two indexed columns
+ (<parameter>a</parameter> <type>int8_minmax_ops</type>, <parameter>b</parameter> <type>box_inclusion_ops</type>).
+ <type>box_inclusion_ops</type> operator class does not support equality operator.
+ The appropriate operator would be <literal>@></literal>.
+ Then the function call looks like this:
+<programlisting>
+ SELECT brin_index_check('index_name', true, true, '=', '@>');
+</programlisting>
+ </para>
+ <para>
+ Defaults to false.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
<varlistentry>
<term>
<function>
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ea95e7984bc..7599f7f8736 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -334,6 +334,7 @@ Boolean
BooleanTest
BpChar
BrinBuildState
+BrinCheckState
BrinDesc
BrinInsertState
BrinLeader
--
2.43.0
[text/x-patch] v12-0001-brin-refactoring.patch (4.1K, 3-v12-0001-brin-refactoring.patch)
download | inline diff:
From 953871776a56edd4ab5ec3cbce39c501e547111c Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <[email protected]>
Date: Wed, 16 Apr 2025 11:26:45 +0300
Subject: [PATCH v12 1/3] brin refactoring
For adding BRIN index support in amcheck we need some tiny changes in BRIN
core code:
* We need to have tuple descriptor for on-disk storage of BRIN tuples.
It is a public field 'bd_disktdesc' in BrinDesc, but to access it we
need function 'brtuple_disk_tupdesc' which is internal. This commit
makes it extern and renames it to 'brin_tuple_tupdesc'.
* For meta page check we need to know pages_per_range upper limit. It's
hardcoded now. This commit moves its value to macros BRIN_MAX_PAGES_PER_RANGE
so that we can use it in amcheck too.
---
src/backend/access/brin/brin_tuple.c | 10 +++++-----
src/backend/access/common/reloptions.c | 3 ++-
src/include/access/brin.h | 1 +
src/include/access/brin_tuple.h | 2 ++
4 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index af39d448962..7c7bd6cc950 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -57,8 +57,8 @@ static inline void brin_deconstruct_tuple(BrinDesc *brdesc,
/*
* Return a tuple descriptor used for on-disk storage of BRIN tuples.
*/
-static TupleDesc
-brtuple_disk_tupdesc(BrinDesc *brdesc)
+TupleDesc
+brin_tuple_tupdesc(BrinDesc *brdesc)
{
/* We cache these in the BrinDesc */
if (brdesc->bd_disktdesc == NULL)
@@ -280,7 +280,7 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
len = hoff = MAXALIGN(len);
- data_len = heap_compute_data_size(brtuple_disk_tupdesc(brdesc),
+ data_len = heap_compute_data_size(brin_tuple_tupdesc(brdesc),
values, nulls);
len += data_len;
@@ -299,7 +299,7 @@ brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,
* need to pass a valid null bitmap so that it will correctly skip
* outputting null attributes in the data area.
*/
- heap_fill_tuple(brtuple_disk_tupdesc(brdesc),
+ heap_fill_tuple(brin_tuple_tupdesc(brdesc),
values,
nulls,
(char *) rettuple + hoff,
@@ -682,7 +682,7 @@ brin_deconstruct_tuple(BrinDesc *brdesc,
* may reuse attribute entries for more than one column, we cannot cache
* offsets here.
*/
- diskdsc = brtuple_disk_tupdesc(brdesc);
+ diskdsc = brin_tuple_tupdesc(brdesc);
stored = 0;
off = 0;
for (attnum = 0; attnum < brdesc->bd_tupdesc->natts; attnum++)
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 3e832c3797e..73e2a36f507 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -22,6 +22,7 @@
#include "access/heaptoast.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
+#include "access/brin.h"
#include "access/reloptions.h"
#include "access/spgist_private.h"
#include "catalog/pg_type.h"
@@ -375,7 +376,7 @@ static relopt_int intRelOpts[] =
"Number of pages that each page range covers in a BRIN index",
RELOPT_KIND_BRIN,
AccessExclusiveLock
- }, 128, 1, 131072
+ }, 128, 1, BRIN_MAX_PAGES_PER_RANGE
},
{
{
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index a5b453468d1..98155f2799b 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -38,6 +38,7 @@ typedef struct BrinStatsData
#define BRIN_DEFAULT_PAGES_PER_RANGE 128
+#define BRIN_MAX_PAGES_PER_RANGE 131072
#define BrinGetPagesPerRange(relation) \
(AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX && \
relation->rd_rel->relam == BRIN_AM_OID), \
diff --git a/src/include/access/brin_tuple.h b/src/include/access/brin_tuple.h
index c818286e523..853cfefc977 100644
--- a/src/include/access/brin_tuple.h
+++ b/src/include/access/brin_tuple.h
@@ -109,4 +109,6 @@ extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
BrinTuple *tuple, BrinMemTuple *dMemtuple);
+extern TupleDesc brin_tuple_tupdesc(BrinDesc *brdesc);
+
#endif /* BRIN_TUPLE_H */
--
2.43.0
[text/x-patch] v12-0002-amcheck-common_verify-snapshot-indcheckxmin-chec.patch (2.5K, 4-v12-0002-amcheck-common_verify-snapshot-indcheckxmin-chec.patch)
download | inline diff:
From c1a2399f80bc4b9dc981df59b6157d7a136da517 Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <[email protected]>
Date: Sat, 11 Apr 2026 16:21:37 +0300
Subject: [PATCH v12 2/3] amcheck: common_verify - snapshot indcheckxmin check
Adds indcheckxmin check to common_verify. Every index needs it for heapallindexed check.
---
contrib/amcheck/verify_common.c | 25 +++++++++++++++++++++++++
contrib/amcheck/verify_common.h | 3 +++
2 files changed, 28 insertions(+)
diff --git a/contrib/amcheck/verify_common.c b/contrib/amcheck/verify_common.c
index 54ce901716b..6909d7aaaa5 100644
--- a/contrib/amcheck/verify_common.c
+++ b/contrib/amcheck/verify_common.c
@@ -189,3 +189,28 @@ index_checkable(Relation rel, Oid am_id)
return amcheck_index_mainfork_expected(rel);
}
+
+/*
+ * GetTransactionSnapshot() always acquires a new MVCC snapshot in
+ * READ COMMITTED mode. A new snapshot is guaranteed to have all
+ * the entries it requires in the index.
+ *
+ * We must defend against the possibility that an old xact
+ * snapshot was returned at higher isolation levels when that
+ * snapshot is not safe for index scans of the target index. This
+ * is possible when the snapshot sees tuples that are before the
+ * index's indcheckxmin horizon. Throwing an error here should be
+ * very rare. It doesn't seem worth using a secondary snapshot to
+ * avoid this.
+ */
+void
+check_indcheckxmin(Relation idxrel, Snapshot snapshot)
+{
+ if (IsolationUsesXactSnapshot() && idxrel->rd_index->indcheckxmin &&
+ !TransactionIdPrecedes(HeapTupleHeaderGetXmin(idxrel->rd_indextuple->t_data),
+ snapshot->xmin))
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("index \"%s\" cannot be verified using transaction snapshot",
+ RelationGetRelationName(idxrel))));
+}
diff --git a/contrib/amcheck/verify_common.h b/contrib/amcheck/verify_common.h
index 4c4ddc01aa7..4aaaefa6445 100644
--- a/contrib/amcheck/verify_common.h
+++ b/contrib/amcheck/verify_common.h
@@ -14,6 +14,7 @@
#include "storage/lmgr.h"
#include "storage/lockdefs.h"
#include "utils/relcache.h"
+#include "utils/snapshot.h"
#include "miscadmin.h"
/* Typedef for callback function for amcheck_lock_relation_and_check */
@@ -26,3 +27,5 @@ extern void amcheck_lock_relation_and_check(Oid indrelid,
Oid am_id,
IndexDoCheckCallback check,
LOCKMODE lockmode, void *state);
+
+extern void check_indcheckxmin(Relation idxrel, Snapshot snapshot);
--
2.43.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: amcheck support for BRIN indexes
In-Reply-To: <CAE7r3MKYYfXJtdHoVox5O7A9poY8zZs8oBYFOPA6yV2iF-4aEA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox