public inbox for [email protected]
help / color / mirror / Atom feedFrom: Andres Freund <[email protected]>
Subject: [PATCH v2 2/3] test_aio: Add read_stream test infrastructure & tests
Date: Wed, 10 Sep 2025 14:00:02 -0400
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
src/test/modules/test_aio/meson.build | 1 +
.../modules/test_aio/t/004_read_stream.pl | 282 +++++++++++++++
src/test/modules/test_aio/test_aio--1.0.sql | 26 +-
src/test/modules/test_aio/test_aio.c | 342 +++++++++++++++---
src/tools/pgindent/typedefs.list | 1 +
5 files changed, 601 insertions(+), 51 deletions(-)
create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl
diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 044149d02b8..d571d9da00d 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
't/001_aio.pl',
't/002_io_workers.pl',
't/003_initdb.pl',
+ 't/004_read_stream.pl',
],
},
}
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..89cfabbb1d3
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,282 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+ 'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+ $node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+ $node->start();
+ test_io_method($method, $node);
+ $node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+ my $node = shift;
+
+ $node->safe_psql(
+ 'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+ ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+ # Preventing larger reads makes testing easier
+ $psql->query_safe(
+ qq/
+SET io_combine_limit = 1;
+/);
+
+ # test miss of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream missing the same block repeatedly");
+
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+ ok(1, "$io_method: stream hitting the same block repeatedly");
+
+ # test hit of the same block twice in a row
+ $psql->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+ $psql->query_safe(
+ qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);
+/);
+ ok(1, "$io_method: stream accessing same block");
+
+ $psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+ my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+ my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads succeeding.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ ok(1,
+ qq/$io_method: read stream encounters succeeding IO by another backend/
+ );
+
+
+ ###
+ # Test read stream encountering buffers undergoing IO in another backend,
+ # with the other backend's reads failing.
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+ $psql_b->{run}->pump_nb();
+ like(
+ $psql_b->{stderr},
+ qr/.*ERROR.*could not read blocks 5..5.*$/,
+ "$io_method: injected error occurred");
+ $psql_b->{stderr} = '';
+ $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+
+ ok(1,
+ qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+ ###
+ # Test read stream encountering two buffers that are undergoing the same
+ # IO, started by another backend
+ ###
+ $psql_a->query_safe(
+ qq/
+SELECT evict_rel('largeish');
+/);
+
+ $psql_b->query_safe(
+ qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+ $psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3);
+/;
+ $psql_b->{run}->pump_nb();
+
+ $node->poll_query_until(
+ 'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+ 'completion_wait');
+
+ $psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);
+/;
+ $psql_a->{run}->pump_nb();
+
+ $node->poll_query_until('postgres',
+ qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+ 'AioIoCompletion');
+
+ $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+ ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+
+ $psql_a->quit();
+ $psql_b->quit();
+}
+
+
+sub test_io_method
+{
+ my $io_method = shift;
+ my $node = shift;
+
+ test_repeated_blocks($io_method, $node);
+
+ SKIP:
+ {
+ skip 'Injection points not supported by this build', 1
+ unless $ENV{enable_injection_points} eq 'yes';
+ test_inject_foreign($io_method, $node);
+ }
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..da7cc03829a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
RETURNS pg_catalog.bool STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
/*
* Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
/*
* Injection point related functions
*/
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index c55cf6c0aac..1831e535b28 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -20,16 +20,23 @@
#include "access/relation.h"
#include "fmgr.h"
+#include "funcapi.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/checksum.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
+#include "utils/wait_event.h"
PG_MODULE_MAGIC;
@@ -37,13 +44,30 @@ PG_MODULE_MAGIC;
typedef struct InjIoErrorState
{
+ ConditionVariable cv;
+
bool enabled_short_read;
bool enabled_reopen;
+ bool enabled_completion_wait;
+ Oid completion_wait_relfilenode;
+ pid_t completion_wait_pid;
+ uint32 completion_wait_event;
+
bool short_read_result_set;
+ Oid short_read_relfilenode;
+ pid_t short_read_pid;
int short_read_result;
} InjIoErrorState;
+typedef struct BlocksReadStreamData
+{
+ int nblocks;
+ int curblock;
+ uint32 *blocks;
+} BlocksReadStreamData;
+
+
static InjIoErrorState *inj_io_error_state;
/* Shared memory init callbacks */
@@ -85,10 +109,13 @@ test_aio_shmem_startup(void)
inj_io_error_state->enabled_short_read = false;
inj_io_error_state->enabled_reopen = false;
+ ConditionVariableInit(&inj_io_error_state->cv);
+ inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
#ifdef USE_INJECTION_POINTS
InjectionPointAttach("aio-process-completion-before-shared",
"test_aio",
- "inj_io_short_read",
+ "inj_io_completion_hook",
NULL,
0);
InjectionPointLoad("aio-process-completion-before-shared");
@@ -378,7 +405,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
if (nblocks <= 0 || nblocks > PG_IOV_MAX)
elog(ERROR, "nblocks is out of range");
- rel = relation_open(relid, AccessExclusiveLock);
+ rel = relation_open(relid, AccessShareLock);
for (int i = 0; i < nblocks; i++)
{
@@ -452,6 +479,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ Relation rel;
+ int32 buffers_evicted,
+ buffers_flushed,
+ buffers_skipped;
+
+ rel = relation_open(relid, AccessExclusiveLock);
+
+ EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+ &buffers_skipped);
+
+ relation_close(rel, AccessExclusiveLock);
+
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(invalidate_rel_block);
Datum
invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -604,6 +652,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ BlocksReadStreamData *stream_data = callback_private_data;
+
+ if (stream_data->curblock >= stream_data->nblocks)
+ return InvalidBlockNumber;
+ return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Relation rel;
+ BlocksReadStreamData stream_data;
+ ReadStream *stream;
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ /*
+ * We expect the input to be an N-element int4 array; verify that. We
+ * don't need to use deconstruct_array() since the array data is just
+ * going to look like a C array of N int4 values.
+ */
+ if (ARR_NDIM(blocksarray) != 1 ||
+ ARR_HASNULL(blocksarray) ||
+ ARR_ELEMTYPE(blocksarray) != INT4OID)
+ elog(ERROR, "expected 1 dimensional int4 array");
+
+ stream_data.curblock = 0;
+ stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+ stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+ rel = relation_open(relid, AccessShareLock);
+
+ stream = read_stream_begin_relation(READ_STREAM_FULL,
+ NULL,
+ rel,
+ MAIN_FORKNUM,
+ read_stream_for_blocks_cb,
+ &stream_data,
+ 0);
+
+ for (int i = 0; i < stream_data.nblocks; i++)
+ {
+ Buffer buf = read_stream_next_buffer(stream, NULL);
+ Datum values[3] = {0};
+ bool nulls[3] = {0};
+
+ if (!BufferIsValid(buf))
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+ values[0] = Int32GetDatum(i);
+ values[1] = UInt32GetDatum(stream_data.blocks[i]);
+ values[2] = UInt32GetDatum(buf);
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ ReleaseBuffer(buf);
+ }
+
+ if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+ elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+ stream_data.nblocks + 1);
+
+ read_stream_end(stream);
+
+ relation_close(rel, NoLock);
+
+ return (Datum) 0;
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)
@@ -674,15 +802,98 @@ batch_end(PG_FUNCTION_ARGS)
}
#ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
- const void *private_data,
- void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+ const void *private_data,
+ void *arg);
extern PGDLLEXPORT void inj_io_reopen(const char *name,
const void *private_data,
void *arg);
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_short_read)
+ return false;
+
+ if (!inj_io_error_state->short_read_result_set)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->short_read_pid != 0 &&
+ inj_io_error_state->short_read_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+ return false;
+
+ /*
+ * Only shorten reads that are actually longer than the target size,
+ * otherwise we can trigger over-reads.
+ */
+ if (inj_io_error_state->short_read_result >= ioh->result)
+ return false;
+
+ return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioTargetData *td;
+
+ if (!inj_io_error_state->enabled_completion_wait)
+ return false;
+
+ owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+ owner_pid = owner_proc->pid;
+
+ if (inj_io_error_state->completion_wait_pid != owner_pid)
+ return false;
+
+ td = pgaio_io_get_target_data(ioh);
+
+ if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+ td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+ return false;
+
+ return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+ PgAioHandle *ioh = (PgAioHandle *) arg;
+
+ if (!inj_io_completion_wait_matches(ioh))
+ return;
+
+ ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+ while (true)
+ {
+ if (!inj_io_completion_wait_matches(ioh))
+ break;
+
+ ConditionVariableSleep(&inj_io_error_state->cv,
+ inj_io_error_state->completion_wait_event);
+ }
+
+ ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
{
PgAioHandle *ioh = (PgAioHandle *) arg;
@@ -691,58 +902,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
inj_io_error_state->enabled_reopen),
errhidestmt(true), errhidecontext(true));
- if (inj_io_error_state->enabled_short_read)
+ if (inj_io_short_read_matches(ioh))
{
+ struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+ int32 old_result = ioh->result;
+ int32 new_result = inj_io_error_state->short_read_result;
+ int32 processed = 0;
+
+ ereport(LOG,
+ errmsg("short read inject point, changing result from %d to %d",
+ old_result, new_result),
+ errhidestmt(true), errhidecontext(true));
+
/*
- * Only shorten reads that are actually longer than the target size,
- * otherwise we can trigger over-reads.
+ * The underlying IO actually completed OK, and thus the "invalid"
+ * portion of the IOV actually contains valid data. That can hide a
+ * lot of problems, e.g. if we were to wrongly mark a buffer, that
+ * wasn't read according to the shortened-read, IO as valid, the
+ * contents would look valid and we might miss a bug.
+ *
+ * To avoid that, iterate through the IOV and zero out the "failed"
+ * portion of the IO.
*/
- if (inj_io_error_state->short_read_result_set
- && ioh->op == PGAIO_OP_READV
- && inj_io_error_state->short_read_result <= ioh->result)
+ for (int i = 0; i < ioh->op_data.read.iov_length; i++)
{
- struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
- int32 old_result = ioh->result;
- int32 new_result = inj_io_error_state->short_read_result;
- int32 processed = 0;
-
- ereport(LOG,
- errmsg("short read inject point, changing result from %d to %d",
- old_result, new_result),
- errhidestmt(true), errhidecontext(true));
-
- /*
- * The underlying IO actually completed OK, and thus the "invalid"
- * portion of the IOV actually contains valid data. That can hide
- * a lot of problems, e.g. if we were to wrongly mark a buffer,
- * that wasn't read according to the shortened-read, IO as valid,
- * the contents would look valid and we might miss a bug.
- *
- * To avoid that, iterate through the IOV and zero out the
- * "failed" portion of the IO.
- */
- for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+ if (processed + iov[i].iov_len <= new_result)
+ processed += iov[i].iov_len;
+ else if (processed <= new_result)
{
- if (processed + iov[i].iov_len <= new_result)
- processed += iov[i].iov_len;
- else if (processed <= new_result)
- {
- uint32 ok_part = new_result - processed;
+ uint32 ok_part = new_result - processed;
- memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
- processed += iov[i].iov_len;
- }
- else
- {
- memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
- }
+ memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+ processed += iov[i].iov_len;
+ }
+ else
+ {
+ memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
}
-
- ioh->result = new_result;
}
+
+ ioh->result = new_result;
}
}
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+ inj_io_completion_wait_hook(name, private_data, arg);
+ inj_io_short_read_hook(name, private_data, arg);
+}
+
void
inj_io_reopen(const char *name, const void *private_data, void *arg)
{
@@ -756,6 +965,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
}
#endif
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = true;
+ inj_io_error_state->completion_wait_pid =
+ PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+ inj_io_error_state->completion_wait_relfilenode =
+ PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+ inj_io_error_state->enabled_completion_wait = false;
+ inj_io_error_state->completion_wait_pid = 0;
+ inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+ ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+ elog(ERROR, "injection points not supported");
+#endif
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
Datum
inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -765,6 +1007,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
if (inj_io_error_state->short_read_result_set)
inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+ inj_io_error_state->short_read_pid =
+ PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+ inj_io_error_state->short_read_relfilenode =
+ PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
#else
elog(ERROR, "injection points not supported");
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a13e8162890..7c99bec3e2e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -293,6 +293,7 @@ BlockSampler
BlockSamplerData
BlockedProcData
BlockedProcsData
+BlocksReadStreamData
BlocktableEntry
BloomBuildState
BloomFilter
--
2.48.1.76.g4e746b1a31.dirty
--aknbaqiolq3mrqjk
Content-Type: text/x-diff; charset=us-ascii
Content-Disposition: attachment;
filename="v2-0003-bufmgr-aio-Prototype-for-not-waiting-for-already-.patch"
view thread (4+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v2 2/3] test_aio: Add read_stream test infrastructure & tests
In-Reply-To: <no-message-id-617317@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox