public inbox for [email protected]
help / color / mirror / Atom feedFrom: Andres Freund <[email protected]>
Subject: [PATCH v9 2/4] test_aio: Add read_stream test infrastructure & tests
Date: Fri, 27 Mar 2026 13:08:26 -0400
While we have a lot of indirect coverage of read streams, there are corner
cases that are hard to test when only indirectly controlling and observing the
read stream. This commit adds an SQL callable SRF interface for a read stream
and uses that in a few tests.
To make some of the tests possible, the injection point infrastructure in
test_aio had to be expanded to allow blocking IO completion.
Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
src/test/modules/test_aio/meson.build | 1 +
.../modules/test_aio/t/004_read_stream.pl | 275 ++++++++++++++
src/test/modules/test_aio/test_aio--1.0.sql | 19 +-
src/test/modules/test_aio/test_aio.c | 336 +++++++++++++++---
src/tools/pgindent/typedefs.list | 1 +
5 files changed, 582 insertions(+), 50 deletions(-)
create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
't/001_aio.pl',
't/002_io_workers.pl',
't/003_initdb.pl',
+ 't/004_read_stream.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..0d123ac0ed5
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,275 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+TestAio::configure($node);
+
+$node->append_conf(
+ 'postgresql.conf', qq(
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ $node->adjust_conf('postgresql.conf', 'io_method', $method);
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+ my $node = shift;
+
+ $node->safe_psql(
+ 'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+ ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ # Preventing larger reads makes testing easier
+ $psql->query_safe(qq/SET io_combine_limit = 1/);
+
+ # test miss of the same block twice in a row
+ $psql->query_safe(qq/SELECT evict_rel('largeish');/);
+
+ # block 0 grows the distance enough that the stream will look ahead and try
+ # to start a pending read for block 2 (and later block 4) twice before
+ # returning any buffers.
+ $psql->query_safe(
+ qq/SELECT * FROM read_stream_for_blocks('largeish',
+ ARRAY[0, 2, 2, 4, 4]);/);
+
+ ok(1, "$io_method: stream missing the same block repeatedly");
+
+ $psql->query_safe(
+ qq/SELECT * FROM read_stream_for_blocks('largeish',
+ ARRAY[0, 2, 2, 4, 4]);/);
+ ok(1, "$io_method: stream hitting the same block repeatedly");
+
+ # test hit of the same block twice in a row
+ $psql->query_safe(qq/SELECT evict_rel('largeish');/);
+ $psql->query_safe(
+ qq/SELECT * FROM read_stream_for_blocks('largeish',
+ ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/);
+ ok(1, "$io_method: stream accessing same block");
+
+ # Test repeated blocks with a temp table, using invalidate_rel_block()
+ # to evict individual local buffers.
+ $psql->query_safe(
+ qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+ INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/);
+
+ # Evict the specific blocks we'll request to force misses
+ $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/);
+ $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/);
+ $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/);
+
+ $psql->query_safe(
+ qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+ ARRAY[0, 2, 2, 4, 4]);/);
+ ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+ # Now the blocks are cached, so repeated access should be hits
+ $psql->query_safe(
+ qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+ ARRAY[0, 2, 2, 4, 4]);/);
+ ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+ $psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+ my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+ my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads succeeding.
+ ###
+ $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+ $psql_b->query_safe(
+ qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+ relfilenode=>pg_relation_filenode('largeish'));/);
+
+ $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+ blockno=>5, nblocks=>1);\n/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/SELECT wait_event FROM pg_stat_activity
+ WHERE wait_event = 'completion_wait';/,
+ 'completion_wait');
+
+ # Block 5 is undergoing IO in session b, so session a will move on to start
+ # a new IO for block 7.
+ $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+ read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ ok(1,
+ qq/$io_method: read stream encounters succeeding IO by another backend/
+ );
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads failing.
+ ###
+ $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+ $psql_b->query_safe(
+ qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+ relfilenode=>pg_relation_filenode('largeish'));/);
+
+ $psql_b->query_safe(
+ qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+ pid=>pg_backend_pid(),
+ relfilenode=>pg_relation_filenode('largeish'));/);
+
+ $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+ blockno=>5, nblocks=>1);\n/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres',
+ qq/SELECT wait_event FROM pg_stat_activity
+ WHERE wait_event = 'completion_wait';/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+ read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr},
+ qr/ERROR.*could not read blocks 5\.\.5/);
+ ok(1, "$io_method: injected error occurred");
+ $psql_b->{stderr} = '';
+ $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+ ok(1,
+ qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+ ###
+ # Test read stream encountering two buffers that are undergoing the same
+ # IO, started by another backend.
+ ###
+ $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+ $psql_b->query_safe(
+ qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+ relfilenode=>pg_relation_filenode('largeish'));/);
+
+ $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+ blockno=>2, nblocks=>3);\n/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres',
+ qq/SELECT wait_event FROM pg_stat_activity
+ WHERE wait_event = 'completion_wait';/,
+ 'completion_wait');
+
+ # Blocks 2 and 4 are undergoing IO initiated by session b
+ $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+ read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+ ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+ $psql_a->quit();
+ $psql_b->quit();
+}
+
+
+sub test_io_method
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ is($node->safe_psql('postgres', 'SHOW io_method'),
+ $io_method, "$io_method: io_method set correctly");
+
+ test_repeated_blocks($io_method, $node);
+
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+ test_inject_foreign($io_method, $node);
+ }
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 86beb563b6a..4a5a379b3c5 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl
RETURNS SETOF record STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
/*
* Handle related functions
@@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Injection point related functions
*/
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT NULL, blockno int4 DEFAULT NULL)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT NULL)
+RETURNS pg_catalog.void
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 3e486a5806e..cb614551914 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -27,13 +27,18 @@
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/checksum.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
#include "utils/tuplestore.h"
+#include "utils/wait_event.h"
PG_MODULE_MAGIC;
@@ -41,13 +46,31 @@ PG_MODULE_MAGIC;
typedef struct InjIoErrorState
{
+ ConditionVariable cv;
+
bool enabled_short_read;
bool enabled_reopen;
+ bool enabled_completion_wait;
+ Oid completion_wait_relfilenode;
+ BlockNumber completion_wait_blockno;
+ pid_t completion_wait_pid;
+ uint32 completion_wait_event;
+
bool short_read_result_set;
+ Oid short_read_relfilenode;
+ pid_t short_read_pid;
int short_read_result;
} InjIoErrorState;
+typedef struct BlocksReadStreamData
+{
+ int nblocks;
+ int curblock;
+ uint32 *blocks;
+} BlocksReadStreamData;
+
+
static InjIoErrorState *inj_io_error_state;
/* Shared memory init callbacks */
@@ -88,11 +111,15 @@ test_aio_shmem_startup(void)
/* First time through, initialize */
inj_io_error_state->enabled_short_read = false;
inj_io_error_state->enabled_reopen = false;
+ inj_io_error_state->enabled_completion_wait = false;
+
+ ConditionVariableInit(&inj_io_error_state->cv);
+ inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
#ifdef USE_INJECTION_POINTS
InjectionPointAttach("aio-process-completion-before-shared",
"test_aio",
- "inj_io_short_read",
+ "inj_io_completion_hook",
NULL,
0);
InjectionPointLoad("aio-process-completion-before-shared");
@@ -388,7 +415,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
if (nblocks <= 0 || nblocks > PG_IOV_MAX)
elog(ERROR, "nblocks is out of range");
- rel = relation_open(relid, AccessExclusiveLock);
+ rel = relation_open(relid, AccessShareLock);
for (int i = 0; i < nblocks; i++)
{
@@ -819,6 +846,85 @@ read_buffers(PG_FUNCTION_ARGS)
}
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ BlocksReadStreamData *stream_data = callback_private_data;
+
+ if (stream_data->curblock >= stream_data->nblocks)
+ return InvalidBlockNumber;
+ return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Relation rel;
+ BlocksReadStreamData stream_data;
+ ReadStream *stream;
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ /*
+ * We expect the input to be an N-element int4 array; verify that. We
+ * don't need to use deconstruct_array() since the array data is just
+ * going to look like a C array of N int4 values.
+ */
+ if (ARR_NDIM(blocksarray) != 1 ||
+ ARR_HASNULL(blocksarray) ||
+ ARR_ELEMTYPE(blocksarray) != INT4OID)
+ elog(ERROR, "expected 1 dimensional int4 array");
+
+ stream_data.curblock = 0;
+ stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+ stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+ rel = relation_open(relid, AccessShareLock);
+
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ read_stream_for_blocks_cb,
+ &stream_data,
+ 0);
+
+ for (int i = 0; i < stream_data.nblocks; i++)
+ {
+ Buffer buf = read_stream_next_buffer(stream, NULL);
+ Datum values[3] = {0};
+ bool nulls[3] = {0};
+
+ if (!BufferIsValid(buf))
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+ values[0] = Int32GetDatum(i);
+ values[1] = UInt32GetDatum(stream_data.blocks[i]);
+ values[2] = UInt32GetDatum(buf);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ ReleaseBuffer(buf);
+ }
+
+ if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+ stream_data.nblocks);
+
+ read_stream_end(stream);
+
+ relation_close(rel, NoLock);
+
+ return (Datum) 0;
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
@@ -889,15 +995,111 @@ batch_end(PG_FUNCTION_ARGS)
}
#ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
- const void *private_data,
- void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+ const void *private_data,
+ void *arg);
extern PGDLLEXPORT void inj_io_reopen(const char *name,
const void *private_data,
void *arg);
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+ PGPROC *io_proc;
+ int32 io_pid;
+ int32 inj_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_short_read)
+ return false;
+
+ if (!inj_io_error_state->short_read_result_set)
+ return false;
+
+ io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ io_pid = io_proc->pid;
+ inj_pid = inj_io_error_state->short_read_pid;
+
+ if (inj_pid != InvalidPid && inj_pid != io_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+ return false;
+
+ /*
+ * Only shorten reads that are actually longer than the target size,
+ * otherwise we can trigger over-reads.
+ */
+ if (inj_io_error_state->short_read_result >= ioh->result)
+ return false;
+
+ return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+ PGPROC *io_proc;
+ int32 io_pid;
+ PgAioTargetData *td;
+ int32 inj_pid;
+ BlockNumber io_blockno;
+ BlockNumber inj_blockno;
+ Oid inj_relfilenode;
+
+ if (!inj_io_error_state->enabled_completion_wait)
+ return false;
+
+ io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ io_pid = io_proc->pid;
+ inj_pid = inj_io_error_state->completion_wait_pid;
+
+ if (inj_pid != InvalidPid && inj_pid != io_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
+ if (inj_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_relfilenode)
+ return false;
+
+ inj_blockno = inj_io_error_state->completion_wait_blockno;
+ io_blockno = td->smgr.blockNum;
+ if (inj_blockno != InvalidBlockNumber &&
+ !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
+ return false;
+
+ return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+ PgAioHandle *ioh = (PgAioHandle *) arg;
+
+ if (!inj_io_completion_wait_matches(ioh))
+ return;
+
+ ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+ while (true)
+ {
+ if (!inj_io_completion_wait_matches(ioh))
+ break;
+
+ ConditionVariableSleep(&inj_io_error_state->cv,
+ inj_io_error_state->completion_wait_event);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
{
PgAioHandle *ioh = (PgAioHandle *) arg;
@@ -906,58 +1108,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
- if (inj_io_error_state->enabled_short_read)
+ if (inj_io_short_read_matches(ioh))
{
+ struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ int32 old_result = ioh->result;
+ int32 new_result = inj_io_error_state->short_read_result;
+ int32 processed = 0;
+
+ ereport(LOG,
+ errmsg("short read inject point, changing result from %d to %d",
+ old_result, new_result),
+ errhidestmt(true), errhidecontext(true));
+
/*
- * Only shorten reads that are actually longer than the target size,
- * otherwise we can trigger over-reads.
+ * The underlying IO actually completed OK, and thus the "invalid"
+ * portion of the IOV actually contains valid data. That can hide a
+ * lot of problems, e.g. if we were to wrongly mark a buffer, that
+ * wasn't read according to the shortened-read, IO as valid, the
+ * contents would look valid and we might miss a bug.
+ *
+ * To avoid that, iterate through the IOV and zero out the "failed"
+ * portion of the IO.
*/
- if (inj_io_error_state->short_read_result_set
- && ioh->op == PGAIO_OP_READV
- && inj_io_error_state->short_read_result <= ioh->result)
+ for (int i = 0; i < ioh->op_data.read.iov_length; i++)
{
- struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
- int32 old_result = ioh->result;
- int32 new_result = inj_io_error_state->short_read_result;
- int32 processed = 0;
-
- ereport(LOG,
- errmsg("short read inject point, changing result from %d to %d",
- old_result, new_result),
- errhidestmt(true), errhidecontext(true));
-
- /*
- * The underlying IO actually completed OK, and thus the "invalid"
- * portion of the IOV actually contains valid data. That can hide
- * a lot of problems, e.g. if we were to wrongly mark a buffer,
- * that wasn't read according to the shortened-read, IO as valid,
- * the contents would look valid and we might miss a bug.
- *
- * To avoid that, iterate through the IOV and zero out the
- * "failed" portion of the IO.
- */
- for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+ if (processed + iov[i].iov_len <= new_result)
+ processed += iov[i].iov_len;
+ else if (processed <= new_result)
{
- if (processed + iov[i].iov_len <= new_result)
- processed += iov[i].iov_len;
- else if (processed <= new_result)
- {
- uint32 ok_part = new_result - processed;
+ uint32 ok_part = new_result - processed;
- memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
- processed += iov[i].iov_len;
- }
- else
- {
- memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
- }
+ memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+ processed += iov[i].iov_len;
+ }
+ else
+ {
+ memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
}
-
- ioh->result = new_result;
}
+
+ ioh->result = new_result;
}
}
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+ inj_io_completion_wait_hook(name, private_data, arg);
+ inj_io_short_read_hook(name, private_data, arg);
+}
+
void
inj_io_reopen(const char *name, const void *private_data, void *arg)
{
@@ -971,6 +1171,42 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
}
#endif
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = true;
+ inj_io_error_state->completion_wait_pid =
+ PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
+ inj_io_error_state->completion_wait_relfilenode =
+ PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+ inj_io_error_state->completion_wait_blockno =
+ PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = false;
+ inj_io_error_state->completion_wait_pid = InvalidPid;
+ inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+ inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
+ ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
Datum
inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -980,6 +1216,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
if (inj_io_error_state->short_read_result_set)
inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+ inj_io_error_state->short_read_pid =
+ PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
+ inj_io_error_state->short_read_relfilenode =
+ PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
#else
elog(ERROR, "injection points not supported");
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 112653c1680..b2c7c9e6f7c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -309,6 +309,7 @@ BlockSampler
BlockSamplerData
BlockedProcData
BlockedProcsData
+BlocksReadStreamData
BlocktableEntry
BloomBuildState
BloomFilter
--
2.53.0.1.gb2826b52eb
--4bovv3espbe3a3ho
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
filename="v9-0003-bufmgr-Improve-StartBufferIO-interface.patch"
view thread (4+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v9 2/4] test_aio: Add read_stream test infrastructure & tests
In-Reply-To: <no-message-id-724865@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox