public inbox for [email protected]
help / color / mirror / Atom feedFrom: Arseniy Mukhin <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Subject: Re: amcheck support for BRIN indexes
Date: Sun, 8 Jun 2025 15:39:22 +0300
Message-ID: <CAE7r3MLmGA_DbUFYxNp4L7ODwDyLQ2u_vfdTSXj-QJu79B4iHg@mail.gmail.com> (raw)
In-Reply-To: <CAE7r3MKZkMwuEeFU5M73Jk3yp+2CBr41a3tfvb3CHym+ysAVEA@mail.gmail.com>
References: <CAE7r3MKZkMwuEeFU5M73Jk3yp+2CBr41a3tfvb3CHym+ysAVEA@mail.gmail.com>
Hi,
Here is a new version.
TAP tests were added. Tried to reproduce more or less every violation.
I don't include 2 tests where disk representation ItemIdData needs to
be changed: it works locally, but I don't think these tests are
portable. While writing tests some minor issues were found and fixed.
Also ci compiler warnings were fixed.
Best regards,
Arseniy Mukhin
Attachments:
[application/x-patch] v2-0001-brin-refactoring.patch (2.5K, 2-v2-0001-brin-refactoring.patch)
download | inline diff:
From 918cc8b07eb19f74646949cc302d6ed67f1bc920 Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <[email protected]>
Date: Wed, 16 Apr 2025 11:26:45 +0300
Subject: [PATCH v2 1/2] brin refactoring
---
src/backend/access/brin/brin_tuple.c | 2 +-
src/backend/access/common/reloptions.c | 3 ++-
src/include/access/brin.h | 1 +
src/include/access/brin_tuple.h | 2 ++
4 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/src/backend/access/brin/brin_tuple.c b/src/backend/access/brin/brin_tuple.c
index 861f397e6db..4d1d8d9addd 100644
--- a/src/backend/access/brin/brin_tuple.c
+++ b/src/backend/access/brin/brin_tuple.c
@@ -57,7 +57,7 @@ static inline void brin_deconstruct_tuple(BrinDesc *brdesc,
/*
* Return a tuple descriptor used for on-disk storage of BRIN tuples.
*/
-static TupleDesc
+TupleDesc
brtuple_disk_tupdesc(BrinDesc *brdesc)
{
/* We cache these in the BrinDesc */
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
index 50747c16396..bc494847341 100644
--- a/src/backend/access/common/reloptions.c
+++ b/src/backend/access/common/reloptions.c
@@ -22,6 +22,7 @@
#include "access/heaptoast.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
+#include "access/brin.h"
#include "access/reloptions.h"
#include "access/spgist_private.h"
#include "catalog/pg_type.h"
@@ -343,7 +344,7 @@ static relopt_int intRelOpts[] =
"Number of pages that each page range covers in a BRIN index",
RELOPT_KIND_BRIN,
AccessExclusiveLock
- }, 128, 1, 131072
+ }, 128, 1, BRIN_MAX_PAGES_PER_RANGE
},
{
{
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index 821f1e02806..334ce973b67 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -37,6 +37,7 @@ typedef struct BrinStatsData
#define BRIN_DEFAULT_PAGES_PER_RANGE 128
+#define BRIN_MAX_PAGES_PER_RANGE 131072
#define BrinGetPagesPerRange(relation) \
(AssertMacro(relation->rd_rel->relkind == RELKIND_INDEX && \
relation->rd_rel->relam == BRIN_AM_OID), \
diff --git a/src/include/access/brin_tuple.h b/src/include/access/brin_tuple.h
index 010ba4ea3c0..9472ca638dd 100644
--- a/src/include/access/brin_tuple.h
+++ b/src/include/access/brin_tuple.h
@@ -109,4 +109,6 @@ extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
BrinTuple *tuple, BrinMemTuple *dMemtuple);
+extern TupleDesc brtuple_disk_tupdesc(BrinDesc *brdesc);
+
#endif /* BRIN_TUPLE_H */
--
2.43.0
[application/x-patch] v2-0002-amcheck-brin-support.patch (61.6K, 3-v2-0002-amcheck-brin-support.patch)
download | inline diff:
From 9ea0db2fb85645fa11bbc864ddf020fe2ebabe9b Mon Sep 17 00:00:00 2001
From: Arseniy Mukhin <[email protected]>
Date: Tue, 22 Apr 2025 11:00:36 +0300
Subject: [PATCH v2 2/2] amcheck - brin support
---
contrib/amcheck/Makefile | 5 +-
contrib/amcheck/amcheck--1.5--1.6.sql | 20 +
contrib/amcheck/amcheck.control | 2 +-
contrib/amcheck/expected/check_brin.out | 134 +++
contrib/amcheck/meson.build | 4 +
contrib/amcheck/sql/check_brin.sql | 102 ++
contrib/amcheck/t/006_verify_brin.pl | 336 ++++++
contrib/amcheck/verify_brin.c | 1274 +++++++++++++++++++++++
8 files changed, 1874 insertions(+), 3 deletions(-)
create mode 100644 contrib/amcheck/amcheck--1.5--1.6.sql
create mode 100644 contrib/amcheck/expected/check_brin.out
create mode 100644 contrib/amcheck/sql/check_brin.sql
create mode 100644 contrib/amcheck/t/006_verify_brin.pl
create mode 100644 contrib/amcheck/verify_brin.c
diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index 1b7a63cbaa4..bdfb274c89c 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -6,11 +6,12 @@ OBJS = \
verify_common.o \
verify_gin.o \
verify_heapam.o \
- verify_nbtree.o
+ verify_nbtree.o \
+ verify_brin.o
EXTENSION = amcheck
DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql \
- amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql
+ amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql amcheck--1.5--1.6.sql
PGFILEDESC = "amcheck - function for verifying relation integrity"
REGRESS = check check_btree check_gin check_heap
diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
new file mode 100644
index 00000000000..0c850a97d16
--- /dev/null
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -0,0 +1,20 @@
+/* contrib/amcheck/amcheck--1.5--1.6.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.6'" to load this file. \quit
+
+
+--
+-- brin_index_check()
+--
+CREATE FUNCTION brin_index_check(index regclass,
+ regular_pages_check boolean default false,
+ heap_all_consistent boolean default false,
+ consistent_operator_names text[] default '{}'
+)
+ RETURNS VOID
+AS 'MODULE_PATHNAME', 'brin_index_check'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+-- We don't want this to be available to public
+REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC;
\ No newline at end of file
diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control
index c8ba6d7c9bc..2f329ef2cf4 100644
--- a/contrib/amcheck/amcheck.control
+++ b/contrib/amcheck/amcheck.control
@@ -1,5 +1,5 @@
# amcheck extension
comment = 'functions for verifying relation integrity'
-default_version = '1.5'
+default_version = '1.6'
module_pathname = '$libdir/amcheck'
relocatable = true
diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out
new file mode 100644
index 00000000000..2690d629723
--- /dev/null
+++ b/contrib/amcheck/expected/check_brin.out
@@ -0,0 +1,134 @@
+-- helper func
+CREATE OR REPLACE FUNCTION random_string( INT ) RETURNS TEXT AS $$
+SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', ceil(random() * 30)::INTEGER, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+-- empty table index should be valid
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multi attributes with varlena attribute test
+CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- multi_min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- bloom opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- inclusion opclass
+CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a)
+SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+ brin_index_check
+------------------
+
+(1 row)
+
+-- cleanup
+DROP TABLE brintest;
+-- cleanup
+DROP FUNCTION random_string;
diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build
index b33e8c9b062..bce10566525 100644
--- a/contrib/amcheck/meson.build
+++ b/contrib/amcheck/meson.build
@@ -5,6 +5,7 @@ amcheck_sources = files(
'verify_gin.c',
'verify_heapam.c',
'verify_nbtree.c',
+ 'verify_brin.c'
)
if host_system == 'windows'
@@ -27,6 +28,7 @@ install_data(
'amcheck--1.2--1.3.sql',
'amcheck--1.3--1.4.sql',
'amcheck--1.4--1.5.sql',
+ 'amcheck--1.5--1.6.sql',
kwargs: contrib_data_args,
)
@@ -40,6 +42,7 @@ tests += {
'check_btree',
'check_gin',
'check_heap',
+ 'check_brin'
],
},
'tap': {
@@ -49,6 +52,7 @@ tests += {
't/003_cic_2pc.pl',
't/004_verify_nbtree_unique.pl',
't/005_pitr.pl',
+ 't/006_verify_brin.pl',
],
},
}
diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql
new file mode 100644
index 00000000000..36e091a6884
--- /dev/null
+++ b/contrib/amcheck/sql/check_brin.sql
@@ -0,0 +1,102 @@
+-- helper func
+CREATE OR REPLACE FUNCTION random_string( INT ) RETURNS TEXT AS $$
+SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', ceil(random() * 30)::INTEGER, 1), '') FROM generate_series(1, $1);
+$$ LANGUAGE sql;
+
+
+-- empty table index should be valid
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- multi attributes with varlena attribute test
+CREATE TABLE brintest (id BIGSERIAL, a TEXT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,5000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- multi_min_max opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+
+-- bloom opclass
+CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x;
+-- create some empty ranges
+DELETE FROM brintest WHERE a > 20000 AND a < 40000;
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true);
+-- cleanup
+DROP TABLE brintest;
+
+
+-- inclusion opclass
+CREATE TABLE brintest (id SERIAL PRIMARY KEY, a BOX);
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+INSERT INTO brintest (a)
+SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000))
+FROM generate_series(1, 10000);
+-- create some empty ranges
+DELETE FROM brintest WHERE id > 2000 AND id < 4000;
+
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+
+-- rebuild index
+DROP INDEX brintest_a_idx;
+CREATE INDEX brintest_a_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2);
+SELECT brin_index_check('brintest_a_idx'::REGCLASS, true, true, '{"@>"}');
+-- cleanup
+DROP TABLE brintest;
+
+
+-- cleanup
+DROP FUNCTION random_string;
\ No newline at end of file
diff --git a/contrib/amcheck/t/006_verify_brin.pl b/contrib/amcheck/t/006_verify_brin.pl
new file mode 100644
index 00000000000..7a6ff029704
--- /dev/null
+++ b/contrib/amcheck/t/006_verify_brin.pl
@@ -0,0 +1,336 @@
+# Copyright (c) 2021-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+
+use Test::More;
+
+my $node;
+my $blksize;
+my $meta_page_blkno = 0;
+
+#
+# Test set-up
+#
+$node = PostgreSQL::Test::Cluster->new('test');
+$node->init(no_data_checksums => 1);
+$node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->start;
+$blksize = int($node->safe_psql('postgres', 'SHOW block_size;'));
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+
+# Tests
+my @tests = (
+ {
+ # invalid meta page type
+
+ find => pack('S', 0xF091),
+ replace => pack('S', 0xAAAA),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted'
+ },
+ {
+ # invalid meta page magic word
+
+ find => pack('L', 0xA8109CFA),
+ replace => pack('L', 0xBB109CFB),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted',
+ },
+ {
+ # invalid meta page index version
+
+ find => pack('L*', 0xA8109CFA, 1),
+ replace => pack('L*', 0xA8109CFA, 2),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted',
+ },
+ {
+ # pages_per_range below lower limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128),
+ replace => pack('L*', 0xA8109CFA, 1, 0),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted',
+ },
+ {
+ # pages_per_range above upper limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128),
+ replace => pack('L*', 0xA8109CFA, 1, 131073),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted'
+ },
+ {
+ # last_revmap_page below lower limit
+
+ find => pack('L*', 0xA8109CFA, 1, 128, 1),
+ replace => pack('L*', 0xA8109CFA, 1, 128, 0),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted',
+ },
+ {
+
+ # last_revmap_page beyond index relation size
+
+ find => pack('L*', 0xA8109CFA, 1, 128, 1),
+ replace => pack('L*', 0xA8109CFA, 1, 128, 100),
+ blkno => $meta_page_blkno,
+ expected => 'metapage is corrupted',
+ },
+ {
+ # invalid revmap page type
+
+ find => pack('S', 0xF092),
+ replace => pack('S', 0xAAAA),
+ blkno => 1, # revmap page
+ expected => 'revmap page is expected at block 1, last revmap page 1',
+ },
+ {
+ # revmap item points beyond index relation size
+ # replace (2,1) with (100,1)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 100, 1),
+ blkno => 1, # revmap page
+ expected => 'revmap item points to a non existing block 100, '
+ . 'index max block 2. Range blkno: 0, revmap item: (1,0)'
+ },
+ {
+ # invalid regular page type
+
+ find => pack('S', 0xF093),
+ replace => pack('S', 0xAAAA),
+ blkno => 2, # regular page
+ expected => 'revmap item points to the page which is not regular (blkno: 2). '
+ . 'Range blkno: 0, revmap item: (1,0)'
+ },
+ {
+ # revmap item points beyond regular page max offset
+ # replace (2,1) with (2,2)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 2, 2),
+ blkno => 1, # revmap page
+ expected => 'revmap item offset number 2 is greater than regular page 2 max offset 1. '
+ . 'Range blkno: 0, revmap item: (1,0)'
+ },
+ {
+ # invalid index tuple range blkno
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 1, 0xA8, 0x01),
+ blkno => 2, # regular page
+ expected => 'index tuple has invalid blkno 1. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)',
+ },
+ {
+ # range beyond the table size and is not empty
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0x88, 0x01),
+ blkno => 2, # regular page
+ expected => 'the range is beyond the table size, but is not marked as empty, table size: 0 blocks. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # corrupt index tuple data offset
+ # here 0x00, 0x00, 0x00 is padding and '.' is varlena len byte
+
+ find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '(.)' . 'aaaaa',
+ replace => pack('LCCCC', 0, 0x1F, 0x00, 0x00, 0x00) . '$1' . 'aaaaa',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => 'index tuple header length 31 is greater than tuple len 24. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # empty range index tuple doesn't have null bitmap
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0x28, 0x01),
+ blkno => 2, # regular page
+ expected => 'empty range index tuple doesn\'t have null bitmap. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # empty range index tuple all_nulls -> false
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0xA8, 0x00),
+ blkno => 2, # regular page
+ expected => 'empty range index tuple attribute 0 with allnulls is false. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # empty range index tuple has_nulls -> true
+
+ find => pack('LCC', 0, 0xA8, 0x01),
+ replace => pack('LCC', 0, 0xA8, 0x03),
+ blkno => 2, # regular page
+ expected => 'empty range index tuple attribute 0 with hasnulls is true. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # invalid index tuple data
+ # replace varlena len with FF - should work with any endianness
+
+ find => pack('LCCCC', 0, 0x08, 0x00, 0x00, 0x00) . '.' . 'aaaaa',
+ replace => pack('LCCCCC', 0, 0x08, 0x00, 0x00, 0x00, 0xFF) . 'aaaaa',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => 'attribute 0 stored value 0 with length -1 ends at offset 127 beyond total tuple length 24. '
+ . 'Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # orphan index tuple
+ # replace valid revmap item with (0,0)
+
+ find => pack('S*', 0, 2, 1),
+ replace => pack('S*', 0, 0, 0),
+ blkno => 1, # revmap page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'););
+ },
+ expected => 'revmap doesn\'t point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)'
+ },
+ {
+ # range is marked as empty_range, but heap has some data for the range
+
+ find => pack('LCC', 0, 0x88, 0x03),
+ replace => pack('LCC', 0, 0xA8, 0x01),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null););
+ },
+ expected => 'range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)'
+ },
+ {
+ # range hasnulls & allnulls are false, but heap contains null values for the range
+
+ find => pack('LCC', 0, 0x88, 0x02),
+ replace => pack('LCC', 0, 0x88, 0x00),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+ },
+ expected => 'range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)'
+ },
+ {
+ # range allnulls is true, but heap contains non-null values for the range
+
+ find => pack('LCC', 0, 0x88, 0x02),
+ replace => pack('LCC', 0, 0x88, 0x01),
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa'););
+ },
+ expected => 'range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)'
+ },
+ {
+ # consistent function return FALSE for the valid heap value
+ # replace "ccccc" with "bbbbb" so that min_max index was too narrow
+
+ find => 'ccccc',
+ replace => 'bbbbb',
+ blkno => 2, # regular page
+ table_data => sub {
+ my ($test_struct) = @_;
+ return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc'););
+ },
+ expected => 'heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)'
+ }
+);
+
+
+# init test data
+my $i = 0;
+foreach my $test_struct (@tests) {
+
+ $test_struct->{table_name} = 't' . $i++;
+ $test_struct->{index_name} = $test_struct->{table_name} . '_brin_idx';
+
+ my $test_data_sql = '';
+ if (exists $test_struct->{table_data}) {
+ $test_data_sql = $test_struct->{table_data}->($test_struct);
+ }
+
+ $node->safe_psql('postgres', qq(
+ CREATE TABLE $test_struct->{table_name} (a TEXT);
+ $test_data_sql
+ CREATE INDEX $test_struct->{index_name} ON $test_struct->{table_name} USING BRIN (a);
+ ));
+
+ $test_struct->{relpath} = relation_filepath($test_struct->{index_name});
+}
+
+# corrupt index
+$node->stop;
+
+foreach my $test_struct (@tests) {
+ string_replace_block(
+ $test_struct->{relpath},
+ $test_struct->{find},
+ $test_struct->{replace},
+ $test_struct->{blkno}
+ );
+}
+
+# assertions
+$node->start;
+
+foreach my $test_struct (@tests) {
+ my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true)));
+ ok($stderr =~ /\Q$test_struct->{expected}\E/);
+}
+
+
+# Helpers
+
+# Returns the filesystem path for the named relation.
+sub relation_filepath {
+ my ($relname) = @_;
+
+ my $pgdata = $node->data_dir;
+ my $rel = $node->safe_psql('postgres',
+ qq(SELECT pg_relation_filepath('$relname')));
+ die "path not found for relation $relname" unless defined $rel;
+ return "$pgdata/$rel";
+}
+
+sub string_replace_block {
+ my ($filename, $find, $replace, $blkno) = @_;
+
+ my $fh;
+ open($fh, '+<', $filename) or BAIL_OUT("open failed: $!");
+ binmode $fh;
+
+ my $offset = $blkno * $blksize;
+ my $buffer;
+
+ sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!");
+ sysread($fh, $buffer, $blksize) or BAIL_OUT("read failed: $!");
+
+ $buffer =~ s/$find/'"' . $replace . '"'/gee;
+
+ sysseek($fh, $offset, 0) or BAIL_OUT("seek failed: $!");
+ syswrite($fh, $buffer) or BAIL_OUT("write failed: $!");
+
+ close($fh) or BAIL_OUT("close failed: $!");
+
+ return;
+}
+
+done_testing();
\ No newline at end of file
diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c
new file mode 100644
index 00000000000..edd796cee59
--- /dev/null
+++ b/contrib/amcheck/verify_brin.c
@@ -0,0 +1,1274 @@
+/*-------------------------------------------------------------------------
+ *
+ * verify_brin.c
+ * Functions to check postgresql brin indexes for corruption
+ *
+ * Copyright (c) 2016-2024, PostgreSQL Global Development Group
+ *
+ * contrib/amcheck/verify_brin.c
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/transam.h"
+#include "access/brin.h"
+#include "catalog/index.h"
+#include "catalog/pg_am_d.h"
+#include "catalog/pg_operator.h"
+#include "miscadmin.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "access/brin_page.h"
+#include "access/brin_revmap.h"
+#include "utils/lsyscache.h"
+#include "verify_common.h"
+#include "utils/builtins.h"
+#include "utils/array.h"
+
+
+PG_FUNCTION_INFO_V1(brin_index_check);
+
+typedef struct BrinCheckState
+{
+
+ /* Check arguments */
+
+ bool regular_pages_check;
+ bool heap_all_consistent;
+ ArrayType *heap_all_consistent_oper_names;
+
+ /* BRIN check common fields */
+
+ Relation idxrel;
+ Relation heaprel;
+ BrinDesc *bdesc;
+ int natts;
+ BlockNumber pagesPerRange;
+
+ /* Index structure check fields */
+
+ BufferAccessStrategy checkstrategy;
+ BlockNumber idxnblocks;
+ BlockNumber heapnblocks;
+ BlockNumber lastRevmapPage;
+ /* Current range blkno */
+ BlockNumber rangeBlkno;
+ /* Current revmap item */
+ BlockNumber revmapBlk;
+ Buffer revmapbuf;
+ Page revmappage;
+ uint32 revmapidx;
+ /* Current index tuple */
+ BlockNumber regpageBlk;
+ Buffer regpagebuf;
+ Page regpage;
+ OffsetNumber regpageoffset;
+
+ /* All heap consistent check fields */
+
+ String **operatorNames;
+ BrinRevmap *revmap;
+ Buffer buf;
+ FmgrInfo *consistentFn;
+ /* Scan keys for regular values */
+ ScanKey *nonnull_sk;
+ /* Scan keys for null values */
+ ScanKey *isnull_sk;
+ double range_cnt;
+ /* first block of the next range */
+ BlockNumber nextrangeBlk;
+
+ /*
+ * checkable_range shows if current range could be checked and dtup
+ * contains valid index tuple for the range. It could be false if the
+ * current range is not summarized, or it's placeholder, or it's just a
+ * beginning of the check
+ */
+ bool checkable_range;
+ BrinMemTuple *dtup;
+ MemoryContext rangeCtx;
+ MemoryContext heaptupleCtx;
+} BrinCheckState;
+
+static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly);
+
+static void check_brin_index_structure(BrinCheckState * pState);
+
+static void check_meta(BrinCheckState * state);
+
+static void check_revmap(BrinCheckState * state);
+
+static void check_revmap_item(BrinCheckState * state);
+
+static void check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp);
+
+static void check_regular_pages(BrinCheckState * state);
+
+static bool revmap_points_to_index_tuple(BrinCheckState * state);
+
+static ItemId PageGetItemIdCareful(BrinCheckState * state);
+
+static void check_all_heap_consistent(BrinCheckState * state);
+
+static void check_and_prepare_operator_names(BrinCheckState * state);
+
+static void brin_check_callback(Relation index,
+ ItemPointer tid,
+ Datum *values,
+ bool *isnull,
+ bool tupleIsAlive,
+ void *brstate);
+
+static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid);
+
+static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno);
+
+static ScanKey prepare_isnull_scan_key(AttrNumber attno);
+
+static void brin_check_ereport(BrinCheckState * state, const char *fmt);
+
+static void revmap_item_ereport(BrinCheckState * state, const char *fmt);
+
+static void index_tuple_ereport(BrinCheckState * state, const char *fmt);
+
+static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt);
+
+static void all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message);
+
+Datum
+brin_index_check(PG_FUNCTION_ARGS)
+{
+ Oid indrelid = PG_GETARG_OID(0);
+ BrinCheckState *state = palloc0(sizeof(BrinCheckState));
+
+ state->regular_pages_check = PG_GETARG_BOOL(1);
+ state->heap_all_consistent = PG_GETARG_BOOL(2);
+ state->heap_all_consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3);
+
+ amcheck_lock_relation_and_check(indrelid,
+ BRIN_AM_OID,
+ brin_check,
+ ShareUpdateExclusiveLock,
+ state);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Main check function
+ */
+static void
+brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly)
+{
+ BrinCheckState *state = (BrinCheckState *) callback_state;
+
+ /* Initialize check common fields */
+ state->idxrel = idxrel;
+ state->heaprel = heaprel;
+ state->bdesc = brin_build_desc(idxrel);
+ state->natts = state->bdesc->bd_tupdesc->natts;
+
+ /*
+ * We know how many attributes index has, so let's process operator names
+ * array
+ */
+ if (state->heap_all_consistent)
+ {
+ check_and_prepare_operator_names(state);
+ }
+
+ check_brin_index_structure(state);
+
+ if (state->heap_all_consistent)
+ {
+ check_all_heap_consistent(state);
+ }
+
+ brin_free_desc(state->bdesc);
+}
+
+/*
+ * Check that index has expected structure
+ *
+ * Some check expectations:
+ * - we hold ShareUpdateExclusiveLock, so revmap could not be extended (i.e. no evacuation) while check as well as
+ * all regular pages should stay regular and ranges could not be summarized and desummarized.
+ * Nevertheless, concurrent updates could lead to new regular page allocations
+ * and moving of index tuples.
+ * - if revmap pointer is valid there should be valid index tuple it points to.
+ * - there are no orphan index tuples (if there is an index tuple, the revmap item points to this tuple also must exist)
+ * - it's possible to encounter placeholder tuples (as a result of crash)
+ * - it's possible to encounter new pages instead of regular (as a result of crash)
+ * - it's possible to encounter pages with evacuation bit (as a result of crash)
+ *
+ */
+static void
+check_brin_index_structure(BrinCheckState * state)
+{
+ /* Index structure check fields initialization */
+ state->checkstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+ check_meta(state);
+
+ /* Check revmap first, blocks: [1, lastRevmapPage] */
+ check_revmap(state);
+
+ /* Check regular pages, blocks: [lastRevmapPage + 1, idxnblocks] */
+ check_regular_pages(state);
+}
+
+/* Meta page check and save some data for the further check */
+static void
+check_meta(BrinCheckState * state)
+{
+ Buffer metabuf;
+ Page metapage;
+ BrinMetaPageData *metadata;
+
+ /* Meta page check */
+ metabuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, BRIN_METAPAGE_BLKNO, RBM_NORMAL,
+ state->checkstrategy);
+ LockBuffer(metabuf, BUFFER_LOCK_SHARE);
+ metapage = BufferGetPage(metabuf);
+ metadata = (BrinMetaPageData *) PageGetContents(metapage);
+ state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+
+ if (!BRIN_IS_META_PAGE(metapage) ||
+ metadata->brinMagic != BRIN_META_MAGIC ||
+ metadata->brinVersion != BRIN_CURRENT_VERSION ||
+ metadata->pagesPerRange < 1 || metadata->pagesPerRange > BRIN_MAX_PAGES_PER_RANGE ||
+ metadata->lastRevmapPage <= BRIN_METAPAGE_BLKNO || metadata->lastRevmapPage >= state->idxnblocks)
+ {
+ brin_check_ereport(state, "metapage is corrupted");
+ }
+
+ state->lastRevmapPage = metadata->lastRevmapPage;
+ state->pagesPerRange = metadata->pagesPerRange;
+ UnlockReleaseBuffer(metabuf);
+}
+
+/*
+ * Walk revmap page by page from the beginning and check every revmap item.
+ * Also check that all pages within [1, lastRevmapPage] are revmap pages.
+ */
+static void
+check_revmap(BrinCheckState * state)
+{
+ Relation idxrel = state->idxrel;
+ BlockNumber lastRevmapPage = state->lastRevmapPage;
+
+ state->rangeBlkno = 0;
+ state->regpagebuf = InvalidBuffer;
+ state->heapnblocks = RelationGetNumberOfBlocks(state->heaprel);
+
+ /* Walk each revmap page */
+ for (state->revmapBlk = BRIN_METAPAGE_BLKNO + 1; state->revmapBlk <= lastRevmapPage; state->revmapBlk++)
+ {
+
+ state->revmapbuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL,
+ state->checkstrategy);
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+ state->revmappage = BufferGetPage(state->revmapbuf);
+
+ /*
+ * Pages with block numbers in [1, lastRevmapPage] should be revmap
+ * pages
+ */
+ if (!BRIN_IS_REVMAP_PAGE(state->revmappage))
+ {
+ brin_check_ereport(state, psprintf("revmap page is expected at block %u, last revmap page %u",
+ state->revmapBlk,
+ lastRevmapPage));
+ }
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+ /* Walk and check all brin tuples from the current revmap page */
+ state->revmapidx = 0;
+ while (state->revmapidx < REVMAP_PAGE_MAXITEMS)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check revmap item */
+ check_revmap_item(state);
+
+ state->rangeBlkno += state->pagesPerRange;
+ state->revmapidx++;
+ }
+
+ elog(DEBUG1, "Complete revmap page check: %d", state->revmapBlk);
+
+ ReleaseBuffer(state->revmapbuf);
+ }
+
+ if (BufferIsValid(state->regpagebuf))
+ {
+ ReleaseBuffer(state->regpagebuf);
+ }
+}
+
+/*
+ * Check revmap item.
+ *
+ * We check revmap item pointer itself and if it is ok we check the index tuple it points to.
+ *
+ * To avoid deadlock we need to unlock revmap page before locking regular page,
+ * so when we get the lock on the regular page our index tuple pointer may no longer be relevant.
+ * So for some checks before reporting an error we need to make sure that our pointer is still relevant and if it's not - retry.
+ */
+static void
+check_revmap_item(BrinCheckState * state)
+{
+ ItemPointerData *revmaptids;
+ RevmapContents *contents;
+ ItemPointerData *iptr;
+ ItemId lp;
+ BrinTuple *tup;
+ Relation idxrel = state->idxrel;
+
+ /* Loop to retry revmap item check if there was a concurrent update. */
+ for (;;)
+ {
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+
+ contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+ revmaptids = contents->rm_tids;
+ /* Pointer for the range with start at state->rangeBlkno */
+ iptr = revmaptids + state->revmapidx;
+
+ /* At first check revmap item pointer */
+
+ /*
+ * Tuple pointer is invalid means range isn't summarized, just move
+ * further
+ */
+ if (!ItemPointerIsValid(iptr))
+ {
+ elog(DEBUG1, "Range %u is not summarized", state->rangeBlkno);
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+ break;
+ }
+
+ /*
+ * Pointer is valid, it should points to index tuple for the range
+ * with blkno rangeBlkno. Remember it and unlock revmap page to avoid
+ * deadlock
+ */
+ state->regpageBlk = ItemPointerGetBlockNumber(iptr);
+ state->regpageoffset = ItemPointerGetOffsetNumber(iptr);
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Check if the regpage block number is greater than the relation
+ * size. To avoid fetching the number of blocks for each tuple, use
+ * cached value first
+ */
+ if (state->regpageBlk >= state->idxnblocks)
+ {
+ /*
+ * Regular pages may have been added, so refresh idxnblocks and
+ * recheck
+ */
+ state->idxnblocks = RelationGetNumberOfBlocks(idxrel);
+ if (state->regpageBlk >= state->idxnblocks)
+ {
+ revmap_item_ereport(state,
+ psprintf("revmap item points to a non existing block %u, index max block %u",
+ state->regpageBlk,
+ state->idxnblocks - 1));
+ }
+ }
+
+ /*
+ * To avoid some pin/unpin cycles we cache last used regular page.
+ * Check if we need different regular page and fetch it.
+ */
+ if (!BufferIsValid(state->regpagebuf) || BufferGetBlockNumber(state->regpagebuf) != state->regpageBlk)
+ {
+ if (BufferIsValid(state->regpagebuf))
+ {
+ ReleaseBuffer(state->regpagebuf);
+ }
+ state->regpagebuf = ReadBufferExtended(idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL,
+ state->checkstrategy);
+ }
+
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+ state->regpage = BufferGetPage(state->regpagebuf);
+
+ /* Revmap should always point to a regular page */
+ if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+ {
+ revmap_item_ereport(state,
+ psprintf("revmap item points to the page which is not regular (blkno: %u)",
+ state->regpageBlk));
+
+ }
+
+ /* Check item offset is valid */
+ if (state->regpageoffset > PageGetMaxOffsetNumber(state->regpage))
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ revmap_item_ereport(state,
+ psprintf("revmap item offset number %u is greater than regular page %u max offset %u",
+ state->regpageoffset,
+ state->regpageBlk,
+ PageGetMaxOffsetNumber(state->regpage)));
+ }
+
+ elog(DEBUG1, "Process range: %u, iptr: (%u,%u)", state->rangeBlkno, state->regpageBlk, state->regpageoffset);
+
+ /*
+ * Revmap pointer is OK. It points to existing regular page, offset
+ * also is ok. Let's check index tuple it points to.
+ */
+
+ lp = PageGetItemIdCareful(state);
+
+ /* Revmap should point to NORMAL tuples only */
+ if (!ItemIdIsUsed(lp))
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ index_tuple_ereport(state, "revmap item points to unused index tuple");
+ }
+
+
+ tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+ /* Check if range block number is as expected */
+ if (tup->bt_blkno != state->rangeBlkno)
+ {
+
+ /* If concurrent update moved our tuple we need to retry */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ continue;
+ }
+
+ index_tuple_ereport(state, psprintf("index tuple has invalid blkno %u", tup->bt_blkno));
+ }
+
+ /*
+ * If the range is beyond the table size - the range must be empty.
+ * It's valid situation for empty table now.
+ */
+ if (state->rangeBlkno >= state->heapnblocks)
+ {
+ if (!BrinTupleIsEmptyRange(tup))
+ {
+ index_tuple_ereport(state,
+ psprintf("the range is beyond the table size, "
+ "but is not marked as empty, table size: %u blocks",
+ state->heapnblocks));
+ }
+ }
+
+ /* Check index tuple itself */
+ check_index_tuple(state, tup, lp);
+
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_UNLOCK);
+ break;
+ }
+}
+
+/*
+ * Check that index tuple has expected structure.
+ *
+ * This function follows the logic performed by brin_deform_tuple().
+ * After this check is complete we are sure that brin_deform_tuple can process it.
+ *
+ * In case of empty range check that for all attributes allnulls are true, hasnulls are false and
+ * there is no data. All core opclasses expect allnulls is true for empty range.
+ */
+static void
+check_index_tuple(BrinCheckState * state, BrinTuple *tuple, ItemId lp)
+{
+
+ char *tp; /* tuple data */
+ uint16 off;
+ bits8 *nullbits;
+ TupleDesc disktdesc;
+ int stored;
+ bool empty_range = BrinTupleIsEmptyRange(tuple);
+ bool hasnullbitmap = BrinTupleHasNulls(tuple);
+ uint8 hoff = BrinTupleDataOffset(tuple);
+ uint16 tuplen = ItemIdGetLength(lp);
+
+
+ /* Check that header length is not greater than tuple length */
+ if (hoff > tuplen)
+ {
+ index_tuple_ereport(state, psprintf("index tuple header length %u is greater than tuple len %u", hoff, tuplen));
+ }
+
+ /* If tuple has null bitmap - initialize it */
+ if (hasnullbitmap)
+ {
+ nullbits = (bits8 *) ((char *) tuple + SizeOfBrinTuple);
+ }
+ else
+ {
+ nullbits = NULL;
+ }
+
+ /* Empty range index tuple checks */
+ if (empty_range)
+ {
+ /* Empty range tuple should have null bitmap */
+ if (!hasnullbitmap)
+ {
+ index_tuple_ereport(state, "empty range index tuple doesn't have null bitmap");
+ }
+
+ Assert(nullbits != NULL);
+
+ /* Check every attribute has allnulls is true and hasnulls is false */
+ for (int attindex = 0; attindex < state->natts; ++attindex)
+ {
+
+ /* Attribute allnulls should be true for empty range */
+ if (att_isnull(attindex, nullbits))
+ {
+ index_tuple_ereport(state,
+ psprintf("empty range index tuple attribute %d with allnulls is false",
+ attindex));
+ }
+
+ /* Attribute hasnulls should be false for empty range */
+ if (!att_isnull(state->natts + attindex, nullbits))
+ {
+ index_tuple_ereport(state,
+ psprintf("empty range index tuple attribute %d with hasnulls is true",
+ attindex));
+ }
+ }
+
+ /* We are done with empty range tuple */
+ return;
+ }
+
+ /*
+ * Range is marked as not empty so we can have some data in the tuple.
+ * Walk all attributes and checks that all stored values fit into the
+ * tuple
+ */
+
+ tp = (char *) tuple + BrinTupleDataOffset(tuple);
+ stored = 0;
+ off = 0;
+
+ disktdesc = brtuple_disk_tupdesc(state->bdesc);
+
+ for (int attindex = 0; attindex < state->natts; ++attindex)
+ {
+ BrinOpcInfo *opclass = state->bdesc->bd_info[attindex];
+
+ /*
+ * if allnulls is set we have no data for this attribute, move to the
+ * next
+ */
+ if (hasnullbitmap && !att_isnull(attindex, nullbits))
+ {
+ stored += opclass->oi_nstored;
+ continue;
+ }
+
+ /* Walk all stored values for the current attribute */
+ for (int datumno = 0; datumno < opclass->oi_nstored; datumno++)
+ {
+ CompactAttribute *thisatt = TupleDescCompactAttr(disktdesc, stored);
+
+ if (thisatt->attlen == -1)
+ {
+ off = att_pointer_alignby(off,
+ thisatt->attalignby,
+ -1,
+ tp + off);
+ }
+ else
+ {
+ off = att_nominal_alignby(off, thisatt->attalignby);
+ }
+
+ /* Check that we are still in the tuple */
+ if (hoff + off > tuplen)
+ {
+ index_tuple_ereport(state,
+ psprintf("attribute %u stored value %u with length %d "
+ "starts at offset %u beyond total tuple length %u",
+ attindex, datumno, thisatt->attlen, off, tuplen));
+ }
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+ /* Check that we are still in the tuple */
+ if (hoff + off > tuplen)
+ {
+ index_tuple_ereport(state,
+ psprintf("attribute %u stored value %u with length %d "
+ "ends at offset %u beyond total tuple length %u",
+ attindex, datumno, thisatt->attlen, off, tuplen));
+ }
+ stored++;
+ }
+
+ }
+
+}
+
+/*
+ * Check all pages within the range [lastRevmapPage + 1, indexnblocks] are regular pages or new
+ * and there is a pointer in revmap to each NORMAL index tuple.
+ */
+static void
+check_regular_pages(BrinCheckState * state)
+{
+ if (!state->regular_pages_check)
+ {
+ return;
+ }
+
+ /* reset state */
+ state->revmapBlk = InvalidBlockNumber;
+ state->revmapbuf = InvalidBuffer;
+ state->revmapidx = -1;
+ state->regpageBlk = InvalidBlockNumber;
+ state->regpagebuf = InvalidBuffer;
+ state->regpageoffset = InvalidOffsetNumber;
+ state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel);
+
+ for (state->regpageBlk = state->lastRevmapPage + 1; state->regpageBlk < state->idxnblocks; state->regpageBlk++)
+ {
+ OffsetNumber maxoff;
+
+ state->regpagebuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->regpageBlk, RBM_NORMAL,
+ state->checkstrategy);
+ LockBuffer(state->regpagebuf, BUFFER_LOCK_SHARE);
+ state->regpage = BufferGetPage(state->regpagebuf);
+
+ /* Skip new pages */
+ if (PageIsNew(state->regpage))
+ {
+ UnlockReleaseBuffer(state->regpagebuf);
+ continue;
+ }
+
+ if (!BRIN_IS_REGULAR_PAGE(state->regpage))
+ {
+ brin_check_ereport(state, psprintf("expected new or regular page at block %u", state->regpageBlk));
+ }
+
+ /* Check that all NORMAL index tuples within the page are not orphans */
+ maxoff = PageGetMaxOffsetNumber(state->regpage);
+ for (state->regpageoffset = FirstOffsetNumber; state->regpageoffset <= maxoff; state->regpageoffset++)
+ {
+ ItemId lp;
+ BrinTuple *tup;
+ BlockNumber revmapBlk;
+
+ lp = PageGetItemIdCareful(state);
+
+ if (ItemIdIsUsed(lp))
+ {
+ tup = (BrinTuple *) PageGetItem(state->regpage, lp);
+
+ /* Get revmap block number for index tuple blkno */
+ revmapBlk = ((tup->bt_blkno / state->pagesPerRange) / REVMAP_PAGE_MAXITEMS) + 1;
+ if (revmapBlk > state->lastRevmapPage)
+ {
+ index_tuple_only_ereport(state, psprintf("no revmap page for the index tuple with blkno %u",
+ tup->bt_blkno));
+ }
+
+ /* Fetch another revmap page if needed */
+ if (state->revmapBlk != revmapBlk)
+ {
+ if (BlockNumberIsValid(state->revmapBlk))
+ {
+ ReleaseBuffer(state->revmapbuf);
+ }
+ state->revmapBlk = revmapBlk;
+ state->revmapbuf = ReadBufferExtended(state->idxrel, MAIN_FORKNUM, state->revmapBlk, RBM_NORMAL,
+ state->checkstrategy);
+ }
+
+ state->revmapidx = (tup->bt_blkno / state->pagesPerRange) % REVMAP_PAGE_MAXITEMS;
+ state->rangeBlkno = tup->bt_blkno;
+
+ /* check that revmap item points to index tuple */
+ if (!revmap_points_to_index_tuple(state))
+ {
+ index_tuple_ereport(state, psprintf("revmap doesn't point to index tuple"));
+ }
+
+ }
+ }
+
+ UnlockReleaseBuffer(state->regpagebuf);
+ }
+
+ if (state->revmapbuf != InvalidBuffer)
+ {
+ ReleaseBuffer(state->revmapbuf);
+ }
+}
+
+/*
+ * Check if the revmap item points to the index tuple (regpageBlk, regpageoffset).
+ * We have locked reg page, and lock revmap page here.
+ * It's a valid lock ordering, so no deadlock is possible.
+ */
+static bool
+revmap_points_to_index_tuple(BrinCheckState * state)
+{
+ ItemPointerData *revmaptids;
+ RevmapContents *contents;
+ ItemPointerData *tid;
+ bool points;
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_SHARE);
+ contents = (RevmapContents *) PageGetContents(BufferGetPage(state->revmapbuf));
+ revmaptids = contents->rm_tids;
+ tid = revmaptids + state->revmapidx;
+
+ points = ItemPointerGetBlockNumberNoCheck(tid) == state->regpageBlk &&
+ ItemPointerGetOffsetNumberNoCheck(tid) == state->regpageoffset;
+
+ LockBuffer(state->revmapbuf, BUFFER_LOCK_UNLOCK);
+ return points;
+}
+
+/*
+ * PageGetItemId() wrapper that validates returned line pointer.
+ *
+ * itemId in brin index could be UNUSED or NORMAL.
+ */
+static ItemId
+PageGetItemIdCareful(BrinCheckState * state)
+{
+ Page page = state->regpage;
+ OffsetNumber offset = state->regpageoffset;
+ ItemId itemid = PageGetItemId(page, offset);
+
+ if (ItemIdGetOffset(itemid) + ItemIdGetLength(itemid) >
+ BLCKSZ - MAXALIGN(sizeof(BrinSpecialSpace)))
+ index_tuple_ereport(state,
+ psprintf("line pointer points past end of tuple space in index. "
+ "lp_off=%u, lp_len=%u lp_flags=%u",
+ ItemIdGetOffset(itemid),
+ ItemIdGetLength(itemid),
+ ItemIdGetFlags(itemid)
+ )
+ );
+
+ /* Verify that line pointer is LP_NORMAL or LP_UNUSED */
+ if (!((ItemIdIsNormal(itemid) && ItemIdHasStorage(itemid)) ||
+ (!ItemIdIsUsed(itemid) && !ItemIdHasStorage(itemid))))
+ {
+ index_tuple_ereport(state,
+ psprintf("invalid line pointer storage in index. "
+ "lp_off=%u, lp_len=%u lp_flags=%u",
+ ItemIdGetOffset(itemid),
+ ItemIdGetLength(itemid),
+ ItemIdGetFlags(itemid)
+ ));
+ }
+
+ return itemid;
+}
+
+/*
+ * Check that every heap tuple are consistent with the index.
+ *
+ * Also check that fields 'empty_range', 'all_nulls' and 'has_nulls'
+ * are not too "narrow" for each range, which means:
+ * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true)
+ * 2) all_nulls = true, but we see nonnull value.
+ * 3) empty_range = true, but we see tuple within the range.
+ *
+ * We use allowSync = false, because this way
+ * we process full ranges one by one from the first range.
+ * It's not necessary, but makes the code simpler and this way
+ * we need to fetch every index tuple only once.
+ */
+static void
+check_all_heap_consistent(BrinCheckState * state)
+{
+ Relation idxrel = state->idxrel;
+ Relation heaprel = state->heaprel;
+ double reltuples;
+ IndexInfo *indexInfo;
+
+ /* All heap consistent check fields initialization */
+
+ state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange);
+ state->dtup = brin_new_memtuple(state->bdesc);
+ state->checkable_range = false;
+ state->consistentFn = palloc0_array(FmgrInfo, state->natts);
+ state->range_cnt = 0;
+ /* next range is the first range in the beginning */
+ state->nextrangeBlk = 0;
+ state->nonnull_sk = palloc0_array(ScanKey, state->natts);
+ state->isnull_sk = palloc0_array(ScanKey, state->natts);
+ state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "brin check range context",
+ ALLOCSET_DEFAULT_SIZES);
+ state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "brin check tuple context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Prepare "non-null" and "is_null" scan keys and consistent fn for each
+ * attribute
+ */
+ for (AttrNumber attno = 1; attno <= state->natts; attno++)
+ {
+ FmgrInfo *tmp;
+
+ tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT);
+ fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext);
+
+ state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno);
+ state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno);
+ }
+
+ indexInfo = BuildIndexInfo(idxrel);
+
+ /*
+ * Use snapshot to check only those tuples that are guaranteed to be
+ * indexed already. Using SnapshotAny would make it more difficult to say
+ * if there is a corruption or checked tuple just haven't been indexed
+ * yet.
+ */
+ indexInfo->ii_Concurrent = true;
+ reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true,
+ brin_check_callback, (void *) state, NULL);
+
+ elog(DEBUG1, "ranges were checked: %f", state->range_cnt);
+ elog(DEBUG1, "scan total tuples: %f", reltuples);
+
+ if (state->buf != InvalidBuffer)
+ ReleaseBuffer(state->buf);
+
+ brinRevmapTerminate(state->revmap);
+ MemoryContextDelete(state->rangeCtx);
+ MemoryContextDelete(state->heaptupleCtx);
+}
+
+/*
+ * Check operator names array input parameter and convert it to array of strings
+ * Empty input array means we use "=" operator for every attribute
+ */
+static void
+check_and_prepare_operator_names(BrinCheckState * state)
+{
+ Oid element_type = ARR_ELEMTYPE(state->heap_all_consistent_oper_names);
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+ Datum *values;
+ bool *elem_nulls;
+ int num_elems;
+
+ state->operatorNames = palloc(sizeof(String) * state->natts);
+
+ get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+ deconstruct_array(state->heap_all_consistent_oper_names, element_type, typlen, typbyval, typalign,
+ &values, &elem_nulls, &num_elems);
+
+ /* If we have some input check it and convert to String** */
+ if (num_elems != 0)
+ {
+ if (num_elems != state->natts)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("Operator names array length %u, but index has %u attributes",
+ num_elems, state->natts)));
+ }
+
+ for (int i = 0; i < num_elems; i++)
+ {
+ if (elem_nulls[i])
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("Operator names array contains NULL")));
+ }
+ state->operatorNames[i] = makeString(TextDatumGetCString(values[i]));
+ }
+ }
+ else
+ {
+ /* If there is no input just use "=" operator for all attributes */
+ for (int i = 0; i < state->natts; i++)
+ {
+ state->operatorNames[i] = makeString("=");
+ }
+ }
+}
+
+/*
+ * Prepare equals ScanKey for index attribute.
+ *
+ * Generated once, and will be reused for all heap tuples.
+ * Argument field will be filled for every heap tuple before
+ * consistent function invocation, so leave it NULL for a while.
+ *
+ * Operator strategy number can vary from opclass to opclass, so we need to lookup it for every attribute.
+ */
+static ScanKey
+prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno)
+{
+ ScanKey scanKey;
+ Oid opOid;
+ Oid opFamilyOid;
+ bool defined;
+ StrategyNumber strategy;
+ RegProcedure opRegProc;
+ List *operNameList;
+ int attindex = attno - 1;
+ Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex);
+ Oid type = attr->atttypid;
+ String *opname = state->operatorNames[attno - 1];
+
+ opFamilyOid = state->idxrel->rd_opfamily[attindex];
+ operNameList = list_make1(opname);
+ opOid = OperatorLookup(operNameList, type, type, &defined);
+
+ if (opOid == InvalidOid)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("There is no operator %s for type %u",
+ opname->sval, type)));
+ }
+
+ strategy = get_op_opfamily_strategy(opOid, opFamilyOid);
+
+ if (strategy == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("operator %s is not a member of operator family \"%s\"",
+ opname->sval,
+ get_opfamily_name(opFamilyOid, false))));
+ }
+
+ opRegProc = get_opcode(opOid);
+ scanKey = palloc0(sizeof(ScanKeyData));
+ ScanKeyEntryInitialize(
+ scanKey,
+ 0,
+ attno,
+ strategy,
+ type,
+ attr->attcollation,
+ opRegProc,
+ (Datum) NULL
+ );
+ pfree(operNameList);
+
+ return scanKey;
+}
+
+static ScanKey
+prepare_isnull_scan_key(AttrNumber attno)
+{
+ ScanKey scanKey;
+
+ scanKey = palloc0(sizeof(ScanKeyData));
+ ScanKeyEntryInitialize(scanKey,
+ SK_ISNULL | SK_SEARCHNULL,
+ attno,
+ InvalidStrategy,
+ InvalidOid,
+ InvalidOid,
+ InvalidOid,
+ (Datum) 0);
+ return scanKey;
+}
+
+/*
+ * We walk from the first range (blkno = 0) to the last as the scan proceed.
+ * For every heap tuple we check if we are done with the current range, and we need to move further
+ * to the current heap tuple's range. While moving to the next range we check that it's not empty (because
+ * we have at least one tuple for this range).
+ * Every heap tuple are checked to be consistent with the range it belongs to.
+ * In case of unsummarized ranges and placeholders we skip all checks.
+ *
+ * While moving, we may jump over some ranges,
+ * but it's okay because we would not be able to check them anyway.
+ * We also can't say whether skipped ranges should be marked as empty or not,
+ * since it's possible that there were some tuples before that are now deleted.
+ *
+ */
+static void
+brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate)
+{
+ BrinCheckState *state;
+ BlockNumber heapblk;
+
+ state = (BrinCheckState *) brstate;
+ heapblk = ItemPointerGetBlockNumber(tid);
+
+ /* If we went beyond the current range let's fetch new range */
+ if (heapblk >= state->nextrangeBlk)
+ {
+ BrinTuple *tup;
+ BrinTuple *tupcopy = NULL;
+ MemoryContext oldCtx;
+ OffsetNumber off;
+ Size size;
+ Size btupsz = 0;
+
+ MemoryContextReset(state->rangeCtx);
+ oldCtx = MemoryContextSwitchTo(state->rangeCtx);
+
+ state->range_cnt++;
+
+ /* Move to the range that contains current heap tuple */
+ tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf,
+ &off, &size, BUFFER_LOCK_SHARE);
+
+ if (tup)
+ {
+ tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz);
+ LockBuffer(state->buf, BUFFER_LOCK_UNLOCK);
+ state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup);
+
+ /* We can't check placeholder ranges */
+ state->checkable_range = !state->dtup->bt_placeholder;
+ }
+ else
+ {
+ /* We can't check unsummarized ranges. */
+ state->checkable_range = false;
+ }
+
+ /*
+ * Update nextrangeBlk so we know when we are done with the current
+ * range
+ */
+ state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange;
+
+ MemoryContextSwitchTo(oldCtx);
+
+ /* Range must not be empty */
+ if (state->checkable_range && state->dtup->bt_empty_range)
+ {
+ all_consist_ereport(state, tid, "range is marked as empty but contains qualified live tuples");
+ }
+
+ }
+
+ /* Check tuple is consistent with the index */
+ if (state->checkable_range)
+ {
+ check_heap_tuple(state, values, isnull, tid);
+ }
+
+}
+
+/*
+ * We check hasnulls flags for null values and oi_regular_nulls = true,
+ * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not,
+ * For all other cases we call consistentFn with appropriate scanKey:
+ * - for oi_regular_nulls = false and null values we use 'isNull' scanKey,
+ * - for nonnull values we use 'nonnull' scanKey
+ */
+static void
+check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid)
+{
+ int attindex;
+ BrinMemTuple *dtup = state->dtup;
+ BrinDesc *bdesc = state->bdesc;
+ MemoryContext oldCtx;
+
+ Assert(state->checkable_range);
+
+ MemoryContextReset(state->heaptupleCtx);
+ oldCtx = MemoryContextSwitchTo(state->heaptupleCtx);
+
+ /* check every index attribute */
+ for (attindex = 0; attindex < state->natts; attindex++)
+ {
+ BrinValues *bval;
+ Datum consistentFnResult;
+ bool consistent;
+ ScanKey scanKey;
+ bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls;
+
+ bval = &dtup->bt_columns[attindex];
+
+ if (nulls[attindex])
+ {
+ /*
+ * Use hasnulls flag for oi_regular_nulls is true. Otherwise,
+ * delegate check to consistentFn
+ */
+ if (oi_regular_nulls)
+ {
+ /* We have null value, so hasnulls or allnulls must be true */
+ if (!(bval->bv_hasnulls || bval->bv_allnulls))
+ {
+ all_consist_ereport(state, tid, "range hasnulls and allnulls are false, but contains a null value");
+ }
+ continue;
+ }
+
+ /*
+ * In case of null and oi_regular_nulls = false we use isNull
+ * scanKey for invocation of consistentFn
+ */
+ scanKey = state->isnull_sk[attindex];
+ }
+ else
+ {
+ /* We have a nonnull value, so allnulls should be false */
+ if (bval->bv_allnulls)
+ {
+ all_consist_ereport(state, tid, "range allnulls is true, but contains nonnull value");
+ }
+
+ /* use "attr = value" scan key for nonnull values */
+ scanKey = state->nonnull_sk[attindex];
+ scanKey->sk_argument = values[attindex];
+ }
+
+ /* If oi_regular_nulls = true we should never get there with null */
+ Assert(!oi_regular_nulls || !nulls[attindex]);
+
+ if (state->consistentFn[attindex].fn_nargs >= 4)
+ {
+ consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex],
+ state->idxrel->rd_indcollation[attindex],
+ PointerGetDatum(state->bdesc),
+ PointerGetDatum(bval),
+ PointerGetDatum(&scanKey),
+ Int32GetDatum(1)
+ );
+ }
+ else
+ {
+ consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex],
+ state->idxrel->rd_indcollation[attindex],
+ PointerGetDatum(state->bdesc),
+ PointerGetDatum(bval),
+ PointerGetDatum(scanKey)
+ );
+ }
+
+ consistent = DatumGetBool(consistentFnResult);
+
+ if (!consistent)
+ {
+ all_consist_ereport(state, tid, "heap tuple inconsistent with index");
+ }
+
+ }
+
+ MemoryContextSwitchTo(oldCtx);
+}
+
+/* Report without any additional info */
+static void
+brin_check_ereport(BrinCheckState * state, const char *fmt)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Index %s is corrupted - %s", RelationGetRelationName(state->idxrel), fmt)));
+}
+
+/* Report with range blkno, revmap item info, index tuple info */
+void
+index_tuple_ereport(BrinCheckState * state, const char *fmt)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+ Assert(state->revmapBlk != InvalidBlockNumber);
+ Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS);
+ Assert(state->regpageBlk != InvalidBlockNumber);
+ Assert(state->regpageoffset != InvalidOffsetNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u), index tuple: (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->rangeBlkno,
+ state->revmapBlk,
+ state->revmapidx,
+ state->regpageBlk,
+ state->regpageoffset)));
+}
+
+/* Report with index tuple info */
+void
+index_tuple_only_ereport(BrinCheckState * state, const char *fmt)
+{
+ Assert(state->regpageBlk != InvalidBlockNumber);
+ Assert(state->regpageoffset != InvalidOffsetNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Index %s is corrupted - %s. Index tuple: (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->regpageBlk,
+ state->regpageoffset)));
+}
+
+/* Report with range blkno, revmap item info */
+void
+revmap_item_ereport(BrinCheckState * state, const char *fmt)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+ Assert(state->revmapBlk != InvalidBlockNumber);
+ Assert(state->revmapidx >= 0 && state->revmapidx < REVMAP_PAGE_MAXITEMS);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Index %s is corrupted - %s. Range blkno: %u, revmap item: (%u,%u).",
+ RelationGetRelationName(state->idxrel),
+ fmt,
+ state->rangeBlkno,
+ state->revmapBlk,
+ state->revmapidx)));
+}
+
+/* Report with range blkno, heap tuple info */
+static void
+all_consist_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message)
+{
+ Assert(state->rangeBlkno != InvalidBlockNumber);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("Index %s is corrupted - %s. Range blkno: %u, heap tid (%u,%u)",
+ RelationGetRelationName(state->idxrel),
+ message,
+ state->dtup->bt_blkno,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid))));
+}
--
2.43.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: amcheck support for BRIN indexes
In-Reply-To: <CAE7r3MLmGA_DbUFYxNp4L7ODwDyLQ2u_vfdTSXj-QJu79B4iHg@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox