public inbox for [email protected]
help / color / mirror / Atom feedFrom: Antonin Houska <[email protected]>
Subject: [PATCH] Allow progress tracking of sub-commands.
Date: Mon, 4 May 2026 11:32:21 +0200
Some commands that support progress reporting run sub-commands, which also
report their progress. The typical case is that REPACK builds indexes. Instead
of disabling the progress tracking of the sub-commands, we can allow both the
"parent" command and the sub-command to report their progress at the same
time.
---
contrib/file_fdw/file_fdw.c | 44 +++++++++++
src/backend/commands/indexcmds.c | 10 ++-
src/backend/commands/repack.c | 14 +++-
src/backend/utils/activity/backend_progress.c | 76 ++++++++++++++++---
src/backend/utils/activity/backend_status.c | 1 +
src/backend/utils/adt/pgstatfuncs.c | 9 ++-
src/include/utils/backend_status.h | 7 ++
7 files changed, 142 insertions(+), 19 deletions(-)
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 33a37d832ce..a60fb226320 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -36,6 +36,7 @@
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
+#include "utils/backend_status.h"
#include "utils/acl.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@@ -119,6 +120,16 @@ typedef struct FileFdwExecutionState
CopyFromState cstate; /* COPY execution state */
} FileFdwExecutionState;
+/*
+ * Since progress tracking of multiple COPY commands is not supported, the
+ * first file_fdw node of the plan needs to set pgstat_track_activities to
+ * false during startup, and the last active node needs to restore the
+ * original value during shutdown.
+ */
+static bool save_pgstat_track_activities = false;
+static int fdw_nodes = 0;
+static int active_fdw_nodes = 0;
+
/*
* SQL functions
*/
@@ -616,6 +627,12 @@ fileGetForeignPlan(PlannerInfo *root,
{
Index scan_relid = baserel->relid;
+ /*
+ * This seems to be the appropriate place to count file_fdw nodes in the
+ * plan.
+ */
+ fdw_nodes++;
+
/*
* We have no native ability to evaluate restriction clauses, so we just
* put all the scan_clauses into the plan node's qual list for the
@@ -695,6 +712,18 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
/* Add any options from the plan (currently only convert_selectively) */
options = list_concat(options, plan->fdw_private);
+ /*
+ * Save the value of pgstat_track_activities if this is the first file_fdw
+ * node of a plan containing multiple file_fdw nodes, and disable the
+ * progress tracking. The monitoring infrastructure currently does not
+ * support monitoring of multiple COPY commands.
+ */
+ if (fdw_nodes > 1 && active_fdw_nodes++ == 0)
+ {
+ save_pgstat_track_activities = pgstat_track_activities;
+ pgstat_track_activities = false;
+ }
+
/*
* Create CopyState from FDW options. We always acquire all columns, so
* as to match the expected ScanTupleSlot signature.
@@ -861,6 +890,21 @@ fileEndForeignScan(ForeignScanState *node)
festate->cstate->num_errors));
EndCopyFrom(festate->cstate);
+
+
+ /*
+ * Restore the value of pgstat_track_activities if this is the last
+ * file_fdw node of a plan containing multiple file_fdw nodes, and enable
+ * progress tracking if we disabled it earlier.
+ */
+ if (active_fdw_nodes > 0)
+ {
+ if (--active_fdw_nodes == 0)
+ {
+ pgstat_track_activities = save_pgstat_track_activities;
+ fdw_nodes = 0;
+ }
+ }
}
/*
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 9ab74c8df0a..7280e64d118 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -3918,6 +3918,12 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
* more detailed comments.
*/
+ /*
+ * XXX Is there a reason not to start progress reporting here? If it's ok,
+ * then INDEX_CREATE_SUPPRESS_PROGRESS below is probably not needed.
+ */
+ pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, relationOid);
+
foreach(lc, indexIds)
{
char *concurrentName;
@@ -3966,8 +3972,6 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
elog(ERROR, "cannot reindex a temporary table concurrently");
- pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, idx->tableId);
-
progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
progress_vals[1] = 0; /* initializing */
progress_vals[2] = idx->indexId;
@@ -4144,7 +4148,6 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
* Update progress for the index to build, with the correct parent
* table involved.
*/
- pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
progress_vals[2] = newidx->indexId;
@@ -4208,7 +4211,6 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
* Update progress for the index to build, with the correct parent
* table involved.
*/
- pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, newidx->tableId);
progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
progress_vals[2] = newidx->indexId;
diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index 9d162957bc3..f360f37a9da 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -1937,6 +1937,7 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
PROGRESS_REPACK_PHASE_REBUILD_INDEX);
+ reindex_params.options |= REINDEXOPT_REPORT_PROGRESS;
reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params);
}
@@ -3204,9 +3205,20 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
"repacknew",
get_rel_namespace(ind->rd_index->indrelid),
false);
- newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS,
+
+ /*
+ * We build the index on the new heap, but after the swap phase it'll
+ * become an index on the old heap. It makes more sense to report the
+ * progress this way. (The reporting API expects that both command and
+ * subcommand deal with the same target.)
+ */
+ pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
+ RelationGetRelid(OldHeap));
+ newindex = index_create_copy(NewHeap, 0,
oldindex, ind->rd_rel->reltablespace,
newName);
+ pgstat_progress_end_command();
+
copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap));
result = lappend_oid(result, newindex);
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index b0359771de5..97965a6973c 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -22,6 +22,10 @@
*
* Set st_progress_command (and st_progress_command_target) in own backend
* entry. Also, zero-initialize st_progress_param array.
+ *
+ * If command has already been started, start a sub-command. Only parameters
+ * of the sub-command are updated until pgstat_progress_end_command() is
+ * called. (Target relation must be the same for both commands.)
*-----------
*/
void
@@ -33,9 +37,30 @@ pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
return;
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
- beentry->st_progress_command = cmdtype;
- beentry->st_progress_command_target = relid;
- MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param));
+ /* Sub-command should not be started w/o parent command. */
+ if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
+ {
+ Assert(beentry->st_progress_command2 == PROGRESS_COMMAND_INVALID);
+
+ beentry->st_progress_command = cmdtype;
+ beentry->st_progress_command_target = relid;
+ MemSet(&beentry->st_progress_param, 0,
+ sizeof(beentry->st_progress_param));
+ }
+ else if (beentry->st_progress_command2 == PROGRESS_COMMAND_INVALID)
+ {
+ Assert(beentry->st_progress_command != PROGRESS_COMMAND_INVALID);
+ Assert(beentry->st_progress_command_target == relid);
+
+ beentry->st_progress_command2 = cmdtype;
+ MemSet(&beentry->st_progress_param2, 0,
+ sizeof(beentry->st_progress_param2));
+ }
+ else
+ {
+ /* Only one level of nesting is supported. */
+ Assert(false);
+ }
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
@@ -49,14 +74,20 @@ void
pgstat_progress_update_param(int index, int64 val)
{
volatile PgBackendStatus *beentry = MyBEEntry;
+ volatile int64 *params;
Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM);
+ Assert(beentry->st_progress_command != PROGRESS_COMMAND_INVALID ||
+ beentry->st_progress_command2 != PROGRESS_COMMAND_INVALID);
if (!beentry || !pgstat_track_activities)
return;
+ params = (beentry->st_progress_command2 == PROGRESS_COMMAND_INVALID) ?
+ beentry->st_progress_param : beentry->st_progress_param2;
+
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
- beentry->st_progress_param[index] = val;
+ params[index] = val;
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
@@ -70,14 +101,20 @@ void
pgstat_progress_incr_param(int index, int64 incr)
{
volatile PgBackendStatus *beentry = MyBEEntry;
+ volatile int64 *params;
Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM);
+ Assert(beentry->st_progress_command != PROGRESS_COMMAND_INVALID ||
+ beentry->st_progress_command2 != PROGRESS_COMMAND_INVALID);
if (!beentry || !pgstat_track_activities)
return;
+ params = (beentry->st_progress_command2 == PROGRESS_COMMAND_INVALID) ?
+ beentry->st_progress_param : beentry->st_progress_param2;
+
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
- beentry->st_progress_param[index] += incr;
+ params[index] += incr;
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
@@ -124,17 +161,24 @@ pgstat_progress_update_multi_param(int nparam, const int *index,
{
volatile PgBackendStatus *beentry = MyBEEntry;
int i;
+ volatile int64 *params;
if (!beentry || !pgstat_track_activities || nparam == 0)
return;
+ Assert(beentry->st_progress_command != PROGRESS_COMMAND_INVALID ||
+ beentry->st_progress_command2 != PROGRESS_COMMAND_INVALID);
+
+ params = (beentry->st_progress_command2 == PROGRESS_COMMAND_INVALID) ?
+ beentry->st_progress_param : beentry->st_progress_param2;
+
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
for (i = 0; i < nparam; ++i)
{
Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM);
- beentry->st_progress_param[index[i]] = val[i];
+ params[index[i]] = val[i];
}
PGSTAT_END_WRITE_ACTIVITY(beentry);
@@ -144,7 +188,7 @@ pgstat_progress_update_multi_param(int nparam, const int *index,
* pgstat_progress_end_command() -
*
* Reset st_progress_command (and st_progress_command_target) in own backend
- * entry. This signals the end of the command.
+ * entry. This signals the end of the command (or a sub-command).
*-----------
*/
void
@@ -155,11 +199,19 @@ pgstat_progress_end_command(void)
if (!beentry || !pgstat_track_activities)
return;
- if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
- return;
-
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
- beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
- beentry->st_progress_command_target = InvalidOid;
+
+ if (beentry->st_progress_command2 != PROGRESS_COMMAND_INVALID)
+ {
+ Assert(beentry->st_progress_command != PROGRESS_COMMAND_INVALID);
+
+ beentry->st_progress_command2 = PROGRESS_COMMAND_INVALID;
+ }
+ else
+ {
+ beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
+ beentry->st_progress_command_target = InvalidOid;
+ }
+
PGSTAT_END_WRITE_ACTIVITY(beentry);
}
diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c
index d685fc5cd87..15675992415 100644
--- a/src/backend/utils/activity/backend_status.c
+++ b/src/backend/utils/activity/backend_status.c
@@ -284,6 +284,7 @@ pgstat_bestart_initial(void)
lbeentry.st_state = STATE_STARTING;
lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
+ lbeentry.st_progress_command2 = PROGRESS_COMMAND_INVALID;
lbeentry.st_progress_command_target = InvalidOid;
lbeentry.st_query_id = INT64CONST(0);
lbeentry.st_plan_id = INT64CONST(0);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 7a9dfa9ba3b..c2257d900af 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -314,6 +314,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum values[PG_STAT_GET_PROGRESS_COLS] = {0};
bool nulls[PG_STAT_GET_PROGRESS_COLS] = {0};
int i;
+ volatile int64 *params;
local_beentry = pgstat_get_local_beentry_by_index(curr_backend);
beentry = &local_beentry->backendStatus;
@@ -322,7 +323,11 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
* Report values for only those backends which are running the given
* command.
*/
- if (beentry->st_progress_command != cmdtype)
+ if (beentry->st_progress_command == cmdtype)
+ params = beentry->st_progress_param;
+ else if (beentry->st_progress_command2 == cmdtype)
+ params = beentry->st_progress_param2;
+ else
continue;
/* Value available to all callers */
@@ -334,7 +339,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
{
values[2] = ObjectIdGetDatum(beentry->st_progress_command_target);
for (i = 0; i < PGSTAT_NUM_PROGRESS_PARAM; i++)
- values[i + 3] = Int64GetDatum(beentry->st_progress_param[i]);
+ values[i + 3] = Int64GetDatum(params[i]);
}
else
{
diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h
index a334e096e4a..f528f7abeec 100644
--- a/src/include/utils/backend_status.h
+++ b/src/include/utils/backend_status.h
@@ -169,6 +169,13 @@ typedef struct PgBackendStatus
Oid st_progress_command_target;
int64 st_progress_param[PGSTAT_NUM_PROGRESS_PARAM];
+ /*
+ * Some commands have a sub-command, e.g. REPACK (re)builds indexes. The
+ * subcommands are supposed to have the same target.
+ */
+ ProgressCommandType st_progress_command2;
+ int64 st_progress_param2[PGSTAT_NUM_PROGRESS_PARAM];
+
/* query identifier, optionally computed using post_parse_analyze_hook */
int64 st_query_id;
--
2.47.3
--=-=-=--
view thread (276+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH] Allow progress tracking of sub-commands.
In-Reply-To: <no-message-id-272666@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox