public inbox for [email protected]
help / color / mirror / Atom feedFrom: Sami Imseih <[email protected]>
To: Lukas Fittl <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Improve pg_stat_statements scalability
Date: Fri, 29 May 2026 21:15:31 -0500
Message-ID: <CAA5RZ0uoxiQ2_=xHGRnyc4WdM9aR0fzdMhBubnw97po==--yGQ@mail.gmail.com> (raw)
In-Reply-To: <CAA5RZ0sQ+gDn-J85j1FzOdL1YjVYRegpmQpDiah1=REWZSZj+Q@mail.gmail.com>
References: <CAA5RZ0vZwR_dSK6fo0P2-EnskUVN0NjLHnGnJMFDPC8-kEW3sQ@mail.gmail.com>
<CAP53Pky9CViOfy1yQxJv0VE7aEiuDb13SppwGa7T_qOeBSumfA@mail.gmail.com>
<CAP53PkyWD99bNwFVv2KibfL4bhm-D78QAnc9UqQQJdtJ3mQWPQ@mail.gmail.com>
<CAA5RZ0sQ+gDn-J85j1FzOdL1YjVYRegpmQpDiah1=REWZSZj+Q@mail.gmail.com>
>> I've created a new wiki page combining the prior 2025 discussion, and
>> notes from today:
>>
>> https://wiki.postgresql.org/wiki/Scalability_of_pg_stat_statements
>Thank you, Lukas!
Thanks for the feedback on v2, and for the productive unconference
discussion in PGConf.dev. I've been iterating on the patches/benchmarks
and wanted to post an updated series that addresses the major design
points raised.
Addressing the feedback from the unconference session:
1/ Eviction
The concern in the design presented in which entries are skipped until
a single threaded eviction completed was that on a steady workload with
max=5000, the 5001st unique query would always be immediately evicted.
This was considered unacceptable, and rightly so. It may be fine for
high churn, pathological cases, but for slow churn cases it would be
unacceptable.
Andres mentioned that eviction should occur in parallel, which I
understood as multiple backends should evict a subset of the entries in
the dshash concurrently. To do this, I implemented a parallel
clock-sweep in which eviction sweeps a partition decrementing
refcounts, and only entries that have decayed to zero are evicted. A
genuinely new query that just arrived has a fresh refcount and survives
the sweep, and if it becomes popular in the meantime, it will survive
for longer. So the algorithm wants an entry to prove itself to remain
considered "hot", else it's swept.
There's a 5% headroom using the already existing
USAGE_DEALLOC_PERCENT in which we evict until count (across all
partitions) drops to 95% of max, so there's always room for new
arrivals before the next sweep.
I chose not to use a background worker for eviction, as discussed
earlier, based on the consensus that backpressure is important, and a
background worker being asynchronous in nature will not provide that.
Also, it could be complicated where a background worker cannot be spun
up for whatever reason.
However, to implement parallel eviction as described above, I needed 2
core changes:
- pgstat_drop_entry must be able to optionally tolerate a dropped
entry, as there could be a delay between the time an entry is marked
as dropped and garbage collection, and within that time multiple
evictions may attempt to drop the same entry.
- Implement a seq scan API for dshash that scans a specific partition.
This will allow eviction to cycle through partitions, i.e.
clock-sweep.
The eviction does not attempt to make room in the bucket for an entry
that triggered it. The main point is to keep forward progress in
making room. We don't need to be more strict here. Also the fact that
pg_stat_statements.max is dynamic means a user can increase this value
to manage high churn without a restart.
The dealloc counter in pg_stat_statements_info now counts individual
entries evicted rather than the number of times eviction was invoked.
I think this is more useful, but it does change the semantics between
versions. With the new design, a single eviction pass can remove many
entries across a partition, so counting evictions no longer tells
you much. Open to other thoughts here on this.
Profiling revealed that pgstat_request_entry_refs_gc(), which is the
standard pattern for less frequently removed entries in other places,
was too expensive when called on every entry drop or every partition
sweep. Now it is only called once per full rotation across all
partitions, which showed much better results in benchmarks.
2/ DSA only query text storage
Rather than a "performance cliff" that Lukas mentioned above when we
switch between DSA and disk for query text storage, the consensus is
to just store all query text in memory. Andres made a point that even
now, transient memory usage for loading the query text for the purpose
of garbage-collection or reading the query stats in
pg_stat_statements means that a user's machine must have enough memory
to handle this. So, why not just throw all query text in memory. The
memory is capped by a new GUC pg_stat_statements.query_text_memory
(default of 4MB but up for discussion).
It is also possible to store more entries due to .max than
.query_text_memory can support, so empty query text columns could be
possible. In the case a user increases .query_text_memory if they
observe empty query columns, the next time an entry is touched, it
will backfill the query text and normalize the string if it can. The
last part could be improved and actually guaranteed if we make
JumbleState available to all hooks (I did not work on this part, but
open for discussion).
Some other comments from Lukas's earlier review:
> It appears you've moved the equivalent of the "if (!entry)" check
> into the pgss_store_query_text function, and we now unconditionally
> call generate_normalized_query.
Fixed.
v3 series is now 5 patches:
0001: pgstat: Introduce pg_stat_report_anytime()
Nothing changes from v2.
0002: pgstat: tolerate already-dropped entries in pgstat_drop_entry()
Required for the parallel eviction design. With multiple backends
sweeping different partitions concurrently, the same entry can be
targeted for drop more than once before garbage collection runs.
A skip_dropped flag makes this safe rather than throwing ERROR.
0003: dshash: add partition-scoped sequential scan
Adds dshash_seq_init_partition() to restrict a scan to a single
partition. This is the building block for per-partition clock-sweep;
a backend only locks and sweeps one partition at a time.
0004: pg_stat_statements: modernize entry storage with pgstat kind
The main patch. Replaces ShmemInitHash with dshash via DSM registry,
and replaces per-entry spinlock counter updates with a custom pgstat
kind that uses the core pgstat infrastructure.
Eviction changes from qsort-all-entries to clock-sweep with an atomic
rotating hand. Each entry carries a refcount (capped at 10) that
decays on sweep; entries reaching zero are evicted. Hot queries keep
their refcount topped up proportionally to access frequency.
pg_stat_statements.max becomes PGC_SIGHUP.
0005: pg_stat_statements: store query text in DSA instead of file
Moves query text from pgss_query_texts.stat into a DSA area via
GetNamedDSA. Adds pg_stat_statements.query_text_memory (PGC_SIGHUP,
default 4MB) controlling DSA size. Eliminates the GC machinery
entirely. When DSA is exhausted, entries are still tracked but query
text is stored as NULL. A backfill mechanism recovers text on
subsequent executions once space becomes available.
Benchmark:
Attached are the benchmark scripts I used for v3 (and will keep
using going forward) with the results in the benchmark_v3.txt file comparing
patch vs upstream.
The benchmark performs various workloads: "high churn", "light churn",
"multi stmt", and simple "select1". I am also tracking query retention
(hot/cold entry retention) to verify the clock-sweep behaves as
expected. I attached the .sql scripts used to benchmark.
The select1 result shows a 1.0% regression. This workload has no
contention, so it purely measures pgstat infrastructure overhead;
perf profiling shows pgstat_get_entry_ref() at the top. However, on
machines with higher core count the upstream spinlock on the counters
becomes a bottleneck, which is where the dshash design should win
back this overhead and maybe more. I still plan to benchmark this on
a larger machine.
You will notice that cold_calls in the patched churn case are much
lower (805 vs 4,458). This is because entries get evicted sooner
under per-partition sweep. Hot and cold query retention lines up w
ith current upstream (1000/1000 hot entries survive continuous churn).
The deallocs count is much higher in the patch (11.9M vs 38.5K in
high churn). per-partition sweep fires frequently to keep the table
at target capacity, whereas upstream batches fewer, larger deallocations.
We can maybe look into reducing USAGE_DEALLOC_PERCENT to
increase retention of "colder" entries.
We also see much less LWLock contention in the patched churn case.
The top wait is PgStatsDSA (502 total) vs pg_stat_statements (7,757)
in upstream, a 15x reduction.
Looking forward to your feedback!
--
Sami Imseih
Amazon Web Services (AWS)
Machine:
CPUs: 16 (8 cores, 2 threads/core, 1 socket)
CPU model: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz
RAM: 29Gi
Build configurations:
release: buildtype=release, cassert=false, ndebug=true
Benchmark:
For each test: pg_stat_statements_reset(), then pgbench (see per-test command below).
While pgbench is running:
- sample pg_stat_activity wait events (WHERE state = 'active') every 1s
- query pg_stat_statements (entries, hot/cold calls) every 20s
hot_calls = sum(calls) FILTER (WHERE query LIKE '%hot%')
cold_calls = sum(calls) FILTER (WHERE query NOT LIKE '%hot%' AND query IS NOT NULL)
=== churn (release, simple) ===
pgbench -f bench_churn.sql -c 64 -j 16 -T 300 -M simple
+---------------+-------------------------+----------------------------------+----------+
| | patch | upstream | delta |
+---------------+-------------------------+----------------------------------+----------+
| TPS | 206959 | 165830 | +24.8% |
| entries | 4968 | 4947 | |
| hot_ent(avg) | 1000 | 1000 | |
| cold_ent(avg) | 3878 | 3883 | |
| hot_cal(avg) | 25601611 | 19872536 | |
| cold_cal(avg) | 805 | 4458 | |
| deallocs | 11935433 | 38501 | |
| top_wait | LWLock:PgStatsDSA (502) | LWLock:pg_stat_statements (7757) | |
+---------------+-------------------------+----------------------------------+----------+
=== light_churn (release, simple) ===
pgbench -f bench_light_churn.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+---------------------------------+----------+
| | patch | upstream | delta |
+---------------+--------------+---------------------------------+----------+
| TPS | 242167 | 238139 | +1.6% |
| entries | 4992 | 4839 | |
| hot_ent(avg) | 1000 | 1000 | |
| cold_ent(avg) | 3988 | 3900 | |
| hot_cal(avg) | 36228974 | 36942948 | |
| cold_cal(avg) | 9545 | 56178 | |
| deallocs | 215079 | 864 | |
| top_wait | none | LWLock:pg_stat_statements (320) | |
+---------------+--------------+---------------------------------+----------+
=== multi_stmt (release, simple) ===
pgbench -f bench_multi_stmt.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+--------------+----------+
| | patch | upstream | delta |
+---------------+--------------+--------------+----------+
| TPS | 18186 | 17815 | +2.0% |
| entries | 6 | 6 | |
| deallocs | 0 | 0 | |
| top_wait | none | none | |
+---------------+--------------+--------------+----------+
=== select1 (release, simple) ===
pgbench -f bench_select1.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+--------------+----------+
| | patch | upstream | delta |
+---------------+--------------+--------------+----------+
| TPS | 286444 | 289616 | -1.0% |
| entries | 4 | 4 | |
| deallocs | 0 | 0 | |
| top_wait | none | none | |
+---------------+--------------+--------------+----------+
Attachments:
[text/plain] benchmark_v3.txt (3.9K, 2-benchmark_v3.txt)
download | inline:
Machine:
CPUs: 16 (8 cores, 2 threads/core, 1 socket)
CPU model: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz
RAM: 29Gi
Build configurations:
release: buildtype=release, cassert=false, ndebug=true
Benchmark:
For each test: pg_stat_statements_reset(), then pgbench (see per-test command below).
While pgbench is running:
- sample pg_stat_activity wait events (WHERE state = 'active') every 1s
- query pg_stat_statements (entries, hot/cold calls) every 20s
hot_calls = sum(calls) FILTER (WHERE query LIKE '%hot%')
cold_calls = sum(calls) FILTER (WHERE query NOT LIKE '%hot%' AND query IS NOT NULL)
=== churn (release, simple) ===
pgbench -f bench_churn.sql -c 64 -j 16 -T 300 -M simple
+---------------+-------------------------+----------------------------------+----------+
| | patch | upstream | delta |
+---------------+-------------------------+----------------------------------+----------+
| TPS | 206959 | 165830 | +24.8% |
| entries | 4968 | 4947 | |
| hot_ent(avg) | 1000 | 1000 | |
| cold_ent(avg) | 3878 | 3883 | |
| hot_cal(avg) | 25601611 | 19872536 | |
| cold_cal(avg) | 805 | 4458 | |
| deallocs | 11935433 | 38501 | |
| top_wait | LWLock:PgStatsDSA (502) | LWLock:pg_stat_statements (7757) | |
+---------------+-------------------------+----------------------------------+----------+
=== light_churn (release, simple) ===
pgbench -f bench_light_churn.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+---------------------------------+----------+
| | patch | upstream | delta |
+---------------+--------------+---------------------------------+----------+
| TPS | 242167 | 238139 | +1.6% |
| entries | 4992 | 4839 | |
| hot_ent(avg) | 1000 | 1000 | |
| cold_ent(avg) | 3988 | 3900 | |
| hot_cal(avg) | 36228974 | 36942948 | |
| cold_cal(avg) | 9545 | 56178 | |
| deallocs | 215079 | 864 | |
| top_wait | none | LWLock:pg_stat_statements (320) | |
+---------------+--------------+---------------------------------+----------+
=== multi_stmt (release, simple) ===
pgbench -f bench_multi_stmt.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+--------------+----------+
| | patch | upstream | delta |
+---------------+--------------+--------------+----------+
| TPS | 18186 | 17815 | +2.0% |
| entries | 6 | 6 | |
| deallocs | 0 | 0 | |
| top_wait | none | none | |
+---------------+--------------+--------------+----------+
=== select1 (release, simple) ===
pgbench -f bench_select1.sql -c 64 -j 16 -T 300 -M simple
+---------------+--------------+--------------+----------+
| | patch | upstream | delta |
+---------------+--------------+--------------+----------+
| TPS | 286444 | 289616 | -1.0% |
| entries | 4 | 4 | |
| deallocs | 0 | 0 | |
| top_wait | none | none | |
+---------------+--------------+--------------+----------+
[application/sql] bench_select1.sql (10B, 3-bench_select1.sql)
download
[application/sql] bench_churn.sql (202B, 4-bench_churn.sql)
download
[application/sql] bench_light_churn.sql (200B, 5-bench_light_churn.sql)
download
[application/sql] bench_multi_stmt.sql (176B, 6-bench_multi_stmt.sql)
download
[application/x-patch] v3-0001-pgstat-Introduce-pg_stat_report_anytime-for-mid-t.patch (28.5K, 7-v3-0001-pgstat-Introduce-pg_stat_report_anytime-for-mid-t.patch)
download | inline diff:
From d5e060fdcc3afae447496f1634241c4403a20dce Mon Sep 17 00:00:00 2001
From: Sami Imseih <[email protected]>
Date: Sun, 10 May 2026 07:06:04 -0500
Subject: [PATCH v3 1/5] pgstat: Introduce pg_stat_report_anytime() for
mid-transaction stats flush
Add an API to flush pending stats that are safe to report inside a
transaction without waiting for transaction end. Relation write
counters (tuples inserted, updated, deleted) for tables modified in
the current transaction are excluded, since their final values depend
on commit/abort outcome.
The SQL function pg_stat_report_anytime(pid) flushes the target
backend's pending stats: if the PID matches the caller's own backend
it flushes immediately, otherwise it signals the target to flush at
its next CHECK_FOR_INTERRUPTS (for regular backends) or main-loop
iteration (for auxiliary processes). The C function
pgstat_report_anytime_stat() flushes pending stats in the calling
backend only.
---
doc/src/sgml/monitoring.sgml | 26 ++++
src/backend/postmaster/autovacuum.c | 3 +
src/backend/postmaster/checkpointer.c | 3 +
src/backend/postmaster/interrupt.c | 4 +
src/backend/postmaster/pgarch.c | 3 +
src/backend/postmaster/startup.c | 4 +
src/backend/postmaster/walsummarizer.c | 3 +
src/backend/storage/ipc/procsignal.c | 3 +
src/backend/tcop/postgres.c | 3 +
src/backend/utils/activity/pgstat.c | 61 +++++++++-
src/backend/utils/activity/pgstat_relation.c | 97 +++++++++------
src/backend/utils/adt/pgstatfuncs.c | 40 ++++++
src/backend/utils/init/globals.c | 1 +
src/include/catalog/pg_proc.dat | 6 +
src/include/miscadmin.h | 1 +
src/include/pgstat.h | 3 +
src/include/storage/procsignal.h | 2 +
src/test/regress/expected/stats.out | 122 +++++++++++++++++++
src/test/regress/sql/stats.sql | 81 ++++++++++++
19 files changed, 431 insertions(+), 35 deletions(-)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 08d5b824552..bb6c928e3e7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -5607,6 +5607,32 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para></entry>
</row>
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_stat_report_anytime</primary>
+ </indexterm>
+ <function>pg_stat_report_anytime</function> ( <type>integer</type> )
+ <returnvalue>boolean</returnvalue>
+ </para>
+ <para>
+ Force flush of pending statistics to shared memory for the backend
+ with the specified process ID. Unlike normal statistics reporting,
+ this can be called from within a transaction. For relations modified
+ by <command>INSERT</command>, <command>UPDATE</command>, or
+ <command>DELETE</command> in the current transaction, only read
+ counters (scans, tuples fetched, blocks hit) are flushed
+ immediately; write counters (tuples inserted, updated, deleted)
+ are deferred until the transaction ends.
+ Returns <literal>true</literal> if the flush was successfully
+ triggered, <literal>false</literal> otherwise.
+ </para>
+ <para>
+ This function is restricted to superusers by default, but other users
+ can be granted EXECUTE to run the function.
+ </para></entry>
+ </row>
+
<row>
<entry role="func_table_entry"><para role="func_signature">
<indexterm>
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index a5a8db2ff88..c5fddf75dab 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -825,6 +825,9 @@ ProcessAutoVacLauncherInterrupts(void)
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
+
/* Process sinval catchup interrupts that happened while sleeping */
ProcessCatchupInterrupt();
}
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 087120db090..874cceb3970 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -694,6 +694,9 @@ ProcessCheckpointerInterrupts(void)
/* Perform logging of memory contexts of this process */
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
}
/*
diff --git a/src/backend/postmaster/interrupt.c b/src/backend/postmaster/interrupt.c
index a2c0ff012c5..4e09e93f8da 100644
--- a/src/backend/postmaster/interrupt.c
+++ b/src/backend/postmaster/interrupt.c
@@ -17,6 +17,7 @@
#include <unistd.h>
#include "miscadmin.h"
+#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@@ -48,6 +49,9 @@ ProcessMainLoopInterrupts(void)
/* Perform logging of memory contexts of this process */
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
}
/*
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 0f207ac0356..d83a5fda862 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -870,6 +870,9 @@ ProcessPgArchInterrupts(void)
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
+
if (ConfigReloadPending)
{
char *archiveLib = pstrdup(XLogArchiveLibrary);
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index b46bac681fe..4a5534a8f9b 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -24,6 +24,7 @@
#include "access/xlogutils.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "postmaster/auxprocess.h"
#include "postmaster/startup.h"
#include "storage/ipc.h"
@@ -192,6 +193,9 @@ ProcessStartupProcInterrupts(void)
/* Perform logging of memory contexts of this process */
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
}
diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c
index 4f12eaf2c85..b1239cbb07f 100644
--- a/src/backend/postmaster/walsummarizer.c
+++ b/src/backend/postmaster/walsummarizer.c
@@ -876,6 +876,9 @@ ProcessWalSummarizerInterrupts(void)
/* Perform logging of memory contexts of this process */
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
}
/*
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 264e4c22ca6..40023ac9888 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -711,6 +711,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
HandleRepackMessageInterrupt();
+ if (CheckProcSignal(PROCSIG_REPORT_ANYTIME_STATS))
+ HandleReportAnytimeStatsInterrupt();
+
if (CheckProcSignal(PROCSIG_SLOTSYNC_MESSAGE))
HandleSlotSyncMessageInterrupt();
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index dbef734a93f..dbca372a3f1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3609,6 +3609,9 @@ ProcessInterrupts(void)
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
+ if (ReportAnytimeStatsPending)
+ ProcessReportAnytimeStatsInterrupt();
+
if (ParallelApplyMessagePending)
ProcessParallelApplyMessages();
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index b67da88c7dc..9b5d9bf09cb 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -106,6 +106,7 @@
#include "access/xact.h"
#include "lib/dshash.h"
+#include "miscadmin.h"
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -845,6 +846,57 @@ pgstat_force_next_flush(void)
pgStatForceNextFlush = true;
}
+/*
+ * Immediately flush all pending statistics entries to shared memory.
+ *
+ * Unlike pgstat_report_stat(), this can be called anytime, including
+ * within a transaction.
+ */
+void
+pgstat_report_anytime_stat(void)
+{
+ pgstat_flush_pending_entries(false);
+
+ for (PgStat_Kind kind = PGSTAT_KIND_MIN; kind <= PGSTAT_KIND_MAX; kind++)
+ {
+ const PgStat_KindInfo *kind_info = pgstat_get_kind_info(kind);
+
+ if (!kind_info || !kind_info->flush_static_cb)
+ continue;
+
+ kind_info->flush_static_cb(false);
+ }
+}
+
+/*
+ * HandleReportAnytimeStatsInterrupt
+ * Handle receipt of an interrupt requesting an anytime stats report.
+ *
+ * All the actual work is deferred to ProcessReportAnytimeStatsInterrupt(),
+ * because we cannot safely acquire locks inside the signal handler.
+ */
+void
+HandleReportAnytimeStatsInterrupt(void)
+{
+ InterruptPending = true;
+ ReportAnytimeStatsPending = true;
+ /* latch will be set by procsignal_sigusr1_handler */
+}
+
+/*
+ * ProcessReportAnytimeStatsInterrupt
+ * Report all pending statistics to shared memory.
+ *
+ * Called from ProcessInterrupts() when ReportAnytimeStatsPending is set.
+ */
+void
+ProcessReportAnytimeStatsInterrupt(void)
+{
+ ReportAnytimeStatsPending = false;
+
+ pgstat_report_anytime_stat();
+}
+
/*
* Only for use by pgstat_reset_counters()
*/
@@ -1414,7 +1466,14 @@ pgstat_flush_pending_entries(bool nowait)
/* flush the stats, if possible */
did_flush = kind_info->flush_pending_cb(entry_ref, nowait);
- Assert(did_flush || nowait);
+ /*
+ * When nowait is false we block for the lock, so the only reason a
+ * flush_pending_cb can legitimately return false is that the entry
+ * has active transaction state that must not be freed yet (e.g.
+ * relation stats with trans != NULL). That situation only arises
+ * mid-transaction, hence the IsTransactionOrTransactionBlock() check.
+ */
+ Assert(did_flush || nowait || IsTransactionOrTransactionBlock());
/* determine next entry, before deleting the pending entry */
if (dlist_has_next(&pgStatPending, cur))
diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c
index b2ca28f83ba..848687a9f7e 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -828,64 +828,76 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
/*
* Ignore entries that didn't accumulate any actual counts, such as
- * indexes that were opened by the planner but not used.
+ * indexes that were opened by the planner but not used. The entry cannot
+ * be freed if there is active transaction state, since
+ * AtEOXact_PgStat_Relations will still merge counters into it.
*/
if (pg_memory_is_all_zeros(&lstats->counts,
sizeof(struct PgStat_TableCounts)))
- return true;
+ return (lstats->trans == NULL);
if (!pgstat_lock_entry(entry_ref, nowait))
return false;
- /* add the values to the shared entry. */
+ /* Update counters that are always safe to flush. */
tabentry = &shtabstats->stats;
tabentry->numscans += lstats->counts.numscans;
if (lstats->counts.numscans)
{
- TimestampTz t = GetCurrentTransactionStopTimestamp();
+ TimestampTz t = IsTransactionOrTransactionBlock() ?
+ GetCurrentStatementStartTimestamp() :
+ GetCurrentTransactionStopTimestamp();
if (t > tabentry->lastscan)
tabentry->lastscan = t;
}
tabentry->tuples_returned += lstats->counts.tuples_returned;
tabentry->tuples_fetched += lstats->counts.tuples_fetched;
- tabentry->tuples_inserted += lstats->counts.tuples_inserted;
- tabentry->tuples_updated += lstats->counts.tuples_updated;
- tabentry->tuples_deleted += lstats->counts.tuples_deleted;
tabentry->tuples_hot_updated += lstats->counts.tuples_hot_updated;
tabentry->tuples_newpage_updated += lstats->counts.tuples_newpage_updated;
+ tabentry->blocks_fetched += lstats->counts.blocks_fetched;
+ tabentry->blocks_hit += lstats->counts.blocks_hit;
/*
- * If table was truncated/dropped, first reset the live/dead counters.
+ * Update counters that are only safe to flush outside of a transaction
+ * that has modified this relation.
*/
- if (lstats->counts.truncdropped)
+ if (lstats->trans == NULL)
{
- tabentry->live_tuples = 0;
- tabentry->dead_tuples = 0;
- tabentry->ins_since_vacuum = 0;
- }
+ tabentry->tuples_inserted += lstats->counts.tuples_inserted;
+ tabentry->tuples_updated += lstats->counts.tuples_updated;
+ tabentry->tuples_deleted += lstats->counts.tuples_deleted;
- tabentry->live_tuples += lstats->counts.delta_live_tuples;
- tabentry->dead_tuples += lstats->counts.delta_dead_tuples;
- tabentry->mod_since_analyze += lstats->counts.changed_tuples;
+ /*
+ * If table was truncated/dropped, first reset the live/dead counters.
+ */
+ if (lstats->counts.truncdropped)
+ {
+ tabentry->live_tuples = 0;
+ tabentry->dead_tuples = 0;
+ tabentry->ins_since_vacuum = 0;
+ }
- /*
- * Using tuples_inserted to update ins_since_vacuum does mean that we'll
- * track aborted inserts too. This isn't ideal, but otherwise probably
- * not worth adding an extra field for. It may just amount to autovacuums
- * triggering for inserts more often than they maybe should, which is
- * probably not going to be common enough to be too concerned about here.
- */
- tabentry->ins_since_vacuum += lstats->counts.tuples_inserted;
+ tabentry->live_tuples += lstats->counts.delta_live_tuples;
+ tabentry->dead_tuples += lstats->counts.delta_dead_tuples;
+ tabentry->mod_since_analyze += lstats->counts.changed_tuples;
- tabentry->blocks_fetched += lstats->counts.blocks_fetched;
- tabentry->blocks_hit += lstats->counts.blocks_hit;
+ /*
+ * Using tuples_inserted to update ins_since_vacuum does mean that
+ * we'll track aborted inserts too. This isn't ideal, but otherwise
+ * probably not worth adding an extra field for. It may just amount
+ * to autovacuums triggering for inserts more often than they maybe
+ * should, which is probably not going to be common enough to be too
+ * concerned about here.
+ */
+ tabentry->ins_since_vacuum += lstats->counts.tuples_inserted;
- /* Clamp live_tuples in case of negative delta_live_tuples */
- tabentry->live_tuples = Max(tabentry->live_tuples, 0);
- /* Likewise for dead_tuples */
- tabentry->dead_tuples = Max(tabentry->dead_tuples, 0);
+ /* Clamp live_tuples in case of negative delta_live_tuples */
+ tabentry->live_tuples = Max(tabentry->live_tuples, 0);
+ /* Likewise for dead_tuples */
+ tabentry->dead_tuples = Max(tabentry->dead_tuples, 0);
+ }
pgstat_unlock_entry(entry_ref);
@@ -893,13 +905,30 @@ pgstat_relation_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
dbentry = pgstat_prep_database_pending(dboid);
dbentry->tuples_returned += lstats->counts.tuples_returned;
dbentry->tuples_fetched += lstats->counts.tuples_fetched;
- dbentry->tuples_inserted += lstats->counts.tuples_inserted;
- dbentry->tuples_updated += lstats->counts.tuples_updated;
- dbentry->tuples_deleted += lstats->counts.tuples_deleted;
dbentry->blocks_fetched += lstats->counts.blocks_fetched;
dbentry->blocks_hit += lstats->counts.blocks_hit;
- return true;
+ if (lstats->trans == NULL)
+ {
+ dbentry->tuples_inserted += lstats->counts.tuples_inserted;
+ dbentry->tuples_updated += lstats->counts.tuples_updated;
+ dbentry->tuples_deleted += lstats->counts.tuples_deleted;
+ return true;
+ }
+
+ /*
+ * This is a partial, in-transaction flush. Zero out the counters we
+ * already flushed so they aren't double-counted on the next flush.
+ */
+ lstats->counts.numscans = 0;
+ lstats->counts.tuples_returned = 0;
+ lstats->counts.tuples_fetched = 0;
+ lstats->counts.tuples_hot_updated = 0;
+ lstats->counts.tuples_newpage_updated = 0;
+ lstats->counts.blocks_fetched = 0;
+ lstats->counts.blocks_hit = 0;
+
+ return false;
}
void
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6f9c9c72de5..eb22490dc2c 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -28,6 +28,7 @@
#include "replication/logicallauncher.h"
#include "storage/proc.h"
#include "storage/procarray.h"
+#include "storage/procsignal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
@@ -1929,6 +1930,45 @@ pg_stat_force_next_flush(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+/*
+ * Signal a backend to report all its pending statistics to shared memory.
+ * If the target is the current backend, the report happens immediately.
+ */
+Datum
+pg_stat_report_anytime(PG_FUNCTION_ARGS)
+{
+ int pid = PG_GETARG_INT32(0);
+ PGPROC *proc;
+ ProcNumber procNumber = INVALID_PROC_NUMBER;
+
+ if (pid == MyProcPid)
+ {
+ pgstat_report_anytime_stat();
+ PG_RETURN_BOOL(true);
+ }
+
+ proc = BackendPidGetProc(pid);
+ if (proc == NULL)
+ proc = AuxiliaryPidGetProc(pid);
+
+ if (proc == NULL)
+ {
+ ereport(WARNING,
+ (errmsg("PID %d is not a PostgreSQL server process", pid)));
+ PG_RETURN_BOOL(false);
+ }
+
+ procNumber = GetNumberFromPGProc(proc);
+ if (SendProcSignal(pid, PROCSIG_REPORT_ANYTIME_STATS, procNumber) < 0)
+ {
+ ereport(WARNING,
+ (errmsg("could not send signal to process %d: %m", pid)));
+ PG_RETURN_BOOL(false);
+ }
+
+ PG_RETURN_BOOL(true);
+}
+
/* Reset all counters for the current database */
Datum
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index bbd28d14d99..1b5b3d59c3c 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -39,6 +39,7 @@ volatile sig_atomic_t TransactionTimeoutPending = false;
volatile sig_atomic_t IdleSessionTimeoutPending = false;
volatile sig_atomic_t ProcSignalBarrierPending = false;
volatile sig_atomic_t LogMemoryContextPending = false;
+volatile sig_atomic_t ReportAnytimeStatsPending = false;
volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
volatile uint32 InterruptHoldoffCount = 0;
volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index be157a5fbe9..406628025b1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6209,6 +6209,12 @@
proname => 'pg_stat_force_next_flush', proisstrict => 'f', provolatile => 'v',
proparallel => 'r', prorettype => 'void', proargtypes => '',
prosrc => 'pg_stat_force_next_flush' },
+{ oid => '9953',
+ descr => 'statistics: force flush of pending stats to shared memory, including from within a transaction',
+ proname => 'pg_stat_report_anytime', provolatile => 'v',
+ prorettype => 'bool', proargtypes => 'int4',
+ prosrc => 'pg_stat_report_anytime',
+ proacl => '{POSTGRES=X}' },
{ oid => '2274',
descr => 'statistics: reset collected statistics for current database',
proname => 'pg_stat_reset', proisstrict => 'f', provolatile => 'v',
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 7de0a115402..52e97b31b28 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -97,6 +97,7 @@ extern PGDLLIMPORT volatile sig_atomic_t TransactionTimeoutPending;
extern PGDLLIMPORT volatile sig_atomic_t IdleSessionTimeoutPending;
extern PGDLLIMPORT volatile sig_atomic_t ProcSignalBarrierPending;
extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending;
+extern PGDLLIMPORT volatile sig_atomic_t ReportAnytimeStatsPending;
extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index dfa2e837638..87def3b08e2 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -552,6 +552,9 @@ extern void pgstat_initialize(void);
/* Functions called from backends */
extern long pgstat_report_stat(bool force);
extern void pgstat_force_next_flush(void);
+extern void pgstat_report_anytime_stat(void);
+extern void HandleReportAnytimeStatsInterrupt(void);
+extern void ProcessReportAnytimeStatsInterrupt(void);
extern void pgstat_reset_counters(void);
extern void pgstat_reset(PgStat_Kind kind, Oid dboid, uint64 objid);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index aaa158bfd66..a184d449eba 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -38,6 +38,8 @@ typedef enum
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
PROCSIG_SLOTSYNC_MESSAGE, /* ask slot synchronization to stop */
PROCSIG_REPACK_MESSAGE, /* Message from repack worker */
+ PROCSIG_REPORT_ANYTIME_STATS, /* ask backend to report anytime
+ * statistics */
PROCSIG_RECOVERY_CONFLICT, /* backend is blocking recovery, check
* PGPROC->pendingRecoveryConflicts for the
* reason */
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index c551abb1178..66b683965a6 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -2040,4 +2040,126 @@ SELECT fastpath_exceeded > :fastpath_exceeded_before FROM pg_stat_lock WHERE loc
(1 row)
DROP TABLE part_test;
+--
+-- Test pg_stat_report_anytime
+--
+CREATE TABLE partial_flush(id int);
+INSERT INTO partial_flush VALUES (1), (2), (3);
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush
+--------------------------
+
+(1 row)
+
+-- Record counters before the explicit transaction
+SELECT seq_scan AS seq_scan_before,
+ seq_tup_read AS seq_tup_read_before,
+ n_tup_ins AS n_tup_ins_before,
+ n_tup_upd AS n_tup_upd_before
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush' \gset
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+-- Generate both transaction-safe and transaction-unsafe counters.
+SELECT count(*) FROM partial_flush;
+ count
+-------
+ 3
+(1 row)
+
+INSERT INTO partial_flush VALUES (4), (5);
+UPDATE partial_flush SET id = id WHERE id = 1;
+-- Flush mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime
+------------------------
+ t
+(1 row)
+
+-- Transaction-safe counters should be visible mid-transaction.
+-- Transaction-unsafe counters (ins, upd) should NOT be flushed yet,
+-- since their final values depend on whether the transaction commits.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta
+----------------+--------------------+-----------------+-----------------
+ 2 | 8 | 0 | 0
+(1 row)
+
+-- Generate more transaction-safe activity to verify no double counting.
+SELECT count(*) FROM partial_flush;
+ count
+-------
+ 5
+(1 row)
+
+-- Flush again mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime
+------------------------
+ t
+(1 row)
+
+-- Should show cumulative totals, not double-counted.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta
+----------------+--------------------+-----------------+-----------------
+ 3 | 13 | 0 | 0
+(1 row)
+
+COMMIT;
+-- After commit, all counters should be flushed.
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush
+--------------------------
+
+(1 row)
+
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+ seq_scan_delta | seq_tup_read_delta | n_tup_ins_delta | n_tup_upd_delta
+----------------+--------------------+-----------------+-----------------
+ 3 | 13 | 2 | 1
+(1 row)
+
+DROP TABLE partial_flush;
+-- Test that pg_stat_report_anytime also flushes non-relation stats.
+CREATE TABLE wal_flush_test(id int);
+SELECT pg_stat_force_next_flush();
+ pg_stat_force_next_flush
+--------------------------
+
+(1 row)
+
+SELECT wal_records AS wal_records_before
+ FROM pg_stat_get_backend_wal(pg_backend_pid()) \gset
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+-- Generate WAL inside the transaction.
+INSERT INTO wal_flush_test SELECT generate_series(1, 10);
+-- Flush mid-transaction; WAL stats should become visible immediately.
+SELECT pg_stat_report_anytime(pg_backend_pid());
+ pg_stat_report_anytime
+------------------------
+ t
+(1 row)
+
+SELECT wal_records > :wal_records_before AS wal_flushed
+ FROM pg_stat_get_backend_wal(pg_backend_pid());
+ wal_flushed
+-------------
+ t
+(1 row)
+
+COMMIT;
+DROP TABLE wal_flush_test;
-- End of Stats Test
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index 610fd21fae4..c8bc0f22f27 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -1008,4 +1008,85 @@ SELECT fastpath_exceeded > :fastpath_exceeded_before FROM pg_stat_lock WHERE loc
DROP TABLE part_test;
+--
+-- Test pg_stat_report_anytime
+--
+CREATE TABLE partial_flush(id int);
+INSERT INTO partial_flush VALUES (1), (2), (3);
+SELECT pg_stat_force_next_flush();
+
+-- Record counters before the explicit transaction
+SELECT seq_scan AS seq_scan_before,
+ seq_tup_read AS seq_tup_read_before,
+ n_tup_ins AS n_tup_ins_before,
+ n_tup_upd AS n_tup_upd_before
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush' \gset
+
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+
+-- Generate both transaction-safe and transaction-unsafe counters.
+SELECT count(*) FROM partial_flush;
+INSERT INTO partial_flush VALUES (4), (5);
+UPDATE partial_flush SET id = id WHERE id = 1;
+
+-- Flush mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+-- Transaction-safe counters should be visible mid-transaction.
+-- Transaction-unsafe counters (ins, upd) should NOT be flushed yet,
+-- since their final values depend on whether the transaction commits.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+-- Generate more transaction-safe activity to verify no double counting.
+SELECT count(*) FROM partial_flush;
+
+-- Flush again mid-transaction
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+-- Should show cumulative totals, not double-counted.
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+COMMIT;
+
+-- After commit, all counters should be flushed.
+SELECT pg_stat_force_next_flush();
+
+SELECT seq_scan - :seq_scan_before AS seq_scan_delta,
+ seq_tup_read - :seq_tup_read_before AS seq_tup_read_delta,
+ n_tup_ins - :n_tup_ins_before AS n_tup_ins_delta,
+ n_tup_upd - :n_tup_upd_before AS n_tup_upd_delta
+ FROM pg_stat_user_tables WHERE relname = 'partial_flush';
+
+DROP TABLE partial_flush;
+
+-- Test that pg_stat_report_anytime also flushes non-relation stats.
+CREATE TABLE wal_flush_test(id int);
+SELECT pg_stat_force_next_flush();
+SELECT wal_records AS wal_records_before
+ FROM pg_stat_get_backend_wal(pg_backend_pid()) \gset
+
+BEGIN;
+SET LOCAL stats_fetch_consistency = none;
+
+-- Generate WAL inside the transaction.
+INSERT INTO wal_flush_test SELECT generate_series(1, 10);
+
+-- Flush mid-transaction; WAL stats should become visible immediately.
+SELECT pg_stat_report_anytime(pg_backend_pid());
+
+SELECT wal_records > :wal_records_before AS wal_flushed
+ FROM pg_stat_get_backend_wal(pg_backend_pid());
+
+COMMIT;
+DROP TABLE wal_flush_test;
+
-- End of Stats Test
--
2.50.1 (Apple Git-155)
[application/x-patch] v3-0002-pgstat-tolerate-already-dropped-entries-in-pgstat.patch (7.4K, 8-v3-0002-pgstat-tolerate-already-dropped-entries-in-pgstat.patch)
download | inline diff:
From 181f051a79e565eaed25fcceddc1f2c90adaae1f Mon Sep 17 00:00:00 2001
From: Sami Imseih <[email protected]>
Date: Thu, 28 May 2026 12:02:21 +0000
Subject: [PATCH v3 2/5] pgstat: tolerate already-dropped entries in
pgstat_drop_entry()
There are valid cases in which pgstat_drop_entry could be called
multiple times before a garbage-collection occurs, leading to an
ERROR if the entry was already marked dead. A caller should be
able to tolerate such cases by supplying a skip_dropped = true
argument.
This will be needed for future work in which parallel backends
may attempt to drop an entry by hash-key.
---
src/backend/utils/activity/pgstat.c | 2 +-
src/backend/utils/activity/pgstat_function.c | 2 +-
src/backend/utils/activity/pgstat_replslot.c | 2 +-
src/backend/utils/activity/pgstat_shmem.c | 12 ++++++++++--
src/backend/utils/activity/pgstat_xact.c | 8 ++++----
src/include/utils/pgstat_internal.h | 2 +-
.../test_custom_stats/test_custom_var_stats.c | 2 +-
7 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c
index 9b5d9bf09cb..e92dbd38e8d 100644
--- a/src/backend/utils/activity/pgstat.c
+++ b/src/backend/utils/activity/pgstat.c
@@ -651,7 +651,7 @@ pgstat_shutdown_hook(int code, Datum arg)
dlist_init(&pgStatPending);
/* drop the backend stats entry */
- if (!pgstat_drop_entry(PGSTAT_KIND_BACKEND, InvalidOid, MyProcNumber))
+ if (!pgstat_drop_entry(PGSTAT_KIND_BACKEND, InvalidOid, MyProcNumber, false))
pgstat_request_entry_refs_gc();
pgstat_detach_shmem();
diff --git a/src/backend/utils/activity/pgstat_function.c b/src/backend/utils/activity/pgstat_function.c
index d47d05e3d92..88ac6d08e69 100644
--- a/src/backend/utils/activity/pgstat_function.c
+++ b/src/backend/utils/activity/pgstat_function.c
@@ -113,7 +113,7 @@ pgstat_init_function_usage(FunctionCallInfo fcinfo,
if (!SearchSysCacheExists1(PROCOID, ObjectIdGetDatum(fcinfo->flinfo->fn_oid)))
{
pgstat_drop_entry(PGSTAT_KIND_FUNCTION, MyDatabaseId,
- fcinfo->flinfo->fn_oid);
+ fcinfo->flinfo->fn_oid, false);
ereport(ERROR, errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("function call to dropped function"));
}
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index 0d00dd5d93a..a32b70a0373 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -188,7 +188,7 @@ pgstat_drop_replslot(ReplicationSlot *slot)
Assert(LWLockHeldByMeInMode(ReplicationSlotAllocationLock, LW_EXCLUSIVE));
if (!pgstat_drop_entry(PGSTAT_KIND_REPLSLOT, InvalidOid,
- ReplicationSlotIndex(slot)))
+ ReplicationSlotIndex(slot), false))
pgstat_request_entry_refs_gc();
}
diff --git a/src/backend/utils/activity/pgstat_shmem.c b/src/backend/utils/activity/pgstat_shmem.c
index b8f354c818a..403e11a0da8 100644
--- a/src/backend/utils/activity/pgstat_shmem.c
+++ b/src/backend/utils/activity/pgstat_shmem.c
@@ -998,7 +998,8 @@ pgstat_drop_database_and_contents(Oid dboid)
* Drop a single stats entry.
*
* This routine returns false if the stats entry of the dropped object could
- * not be freed, true otherwise.
+ * not be freed, true otherwise. If skip_dropped is true, already-dropped
+ * entries are tolerated and treated as success.
*
* The callers of this function should call pgstat_request_entry_refs_gc()
* if the stats entry could not be freed, to ensure that this entry's memory
@@ -1006,7 +1007,7 @@ pgstat_drop_database_and_contents(Oid dboid)
* pgstat_gc_entry_refs().
*/
bool
-pgstat_drop_entry(PgStat_Kind kind, Oid dboid, uint64 objid)
+pgstat_drop_entry(PgStat_Kind kind, Oid dboid, uint64 objid, bool skip_dropped)
{
PgStat_HashKey key = {0};
PgStatShared_HashEntry *shent;
@@ -1031,6 +1032,13 @@ pgstat_drop_entry(PgStat_Kind kind, Oid dboid, uint64 objid)
shent = dshash_find(pgStatLocal.shared_hash, &key, true);
if (shent)
{
+ /* already dropped by another backend, nothing to do */
+ if (shent->dropped && skip_dropped)
+ {
+ dshash_release_lock(pgStatLocal.shared_hash, shent);
+ return true;
+ }
+
freed = pgstat_drop_entry_internal(shent, NULL);
/*
diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c
index 5e2d69e6297..927b6268fc5 100644
--- a/src/backend/utils/activity/pgstat_xact.c
+++ b/src/backend/utils/activity/pgstat_xact.c
@@ -85,7 +85,7 @@ AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit)
* Transaction that dropped an object committed. Drop the stats
* too.
*/
- if (!pgstat_drop_entry(it->kind, it->dboid, objid))
+ if (!pgstat_drop_entry(it->kind, it->dboid, objid, false))
not_freed_count++;
}
else if (!isCommit && pending->is_create)
@@ -94,7 +94,7 @@ AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit)
* Transaction that created an object aborted. Drop the stats
* associated with the object.
*/
- if (!pgstat_drop_entry(it->kind, it->dboid, objid))
+ if (!pgstat_drop_entry(it->kind, it->dboid, objid, false))
not_freed_count++;
}
@@ -160,7 +160,7 @@ AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
* Subtransaction creating a new stats object aborted. Drop the
* stats object.
*/
- if (!pgstat_drop_entry(it->kind, it->dboid, objid))
+ if (!pgstat_drop_entry(it->kind, it->dboid, objid, false))
not_freed_count++;
pfree(pending);
}
@@ -323,7 +323,7 @@ pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items,
xl_xact_stats_item *it = &items[i];
uint64 objid = ((uint64) it->objid_hi) << 32 | it->objid_lo;
- if (!pgstat_drop_entry(it->kind, it->dboid, objid))
+ if (!pgstat_drop_entry(it->kind, it->dboid, objid, false))
not_freed_count++;
}
diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h
index fe463faaf63..96f189e0dc3 100644
--- a/src/include/utils/pgstat_internal.h
+++ b/src/include/utils/pgstat_internal.h
@@ -807,7 +807,7 @@ extern PgStat_EntryRef *pgstat_get_entry_ref(PgStat_Kind kind, Oid dboid, uint64
extern bool pgstat_lock_entry(PgStat_EntryRef *entry_ref, bool nowait);
extern bool pgstat_lock_entry_shared(PgStat_EntryRef *entry_ref, bool nowait);
extern void pgstat_unlock_entry(PgStat_EntryRef *entry_ref);
-extern bool pgstat_drop_entry(PgStat_Kind kind, Oid dboid, uint64 objid);
+extern bool pgstat_drop_entry(PgStat_Kind kind, Oid dboid, uint64 objid, bool skip_dropped);
extern void pgstat_drop_all_entries(void);
extern void pgstat_drop_matching_entries(bool (*do_drop) (PgStatShared_HashEntry *, Datum),
Datum match_data);
diff --git a/src/test/modules/test_custom_stats/test_custom_var_stats.c b/src/test/modules/test_custom_stats/test_custom_var_stats.c
index 5c4871ed37c..863d6a52492 100644
--- a/src/test/modules/test_custom_stats/test_custom_var_stats.c
+++ b/src/test/modules/test_custom_stats/test_custom_var_stats.c
@@ -600,7 +600,7 @@ test_custom_stats_var_drop(PG_FUNCTION_ARGS)
/* Drop entry and request GC if the entry could not be freed */
if (!pgstat_drop_entry(PGSTAT_KIND_TEST_CUSTOM_VAR_STATS, InvalidOid,
- PGSTAT_CUSTOM_VAR_STATS_IDX(stat_name)))
+ PGSTAT_CUSTOM_VAR_STATS_IDX(stat_name), false))
pgstat_request_entry_refs_gc();
PG_RETURN_VOID();
--
2.50.1 (Apple Git-155)
[application/x-patch] v3-0003-dshash-add-partition-scoped-sequential-scan.patch (5.3K, 9-v3-0003-dshash-add-partition-scoped-sequential-scan.patch)
download | inline diff:
From bf08305059d33b65953a424243d452ac90f00786 Mon Sep 17 00:00:00 2001
From: Sami Imseih <[email protected]>
Date: Wed, 27 May 2026 10:02:55 -0500
Subject: [PATCH v3 3/5] dshash: add partition-scoped sequential scan
Add dshash_seq_init_partition(), which restricts a sequential scan to
a single partition. This allows multiple backends to scan a dshash
table in parallel by dividing partitions among them, which a future
commit will use to implement parallel clock-sweep eviction.
Also move the DSHASH_NUM_PARTITIONS macros to dshash.h so that callers
can determine the partition count.
Rename dshash_seq_status.nbuckets to endbucket, since the field
represents the exclusive upper-bound bucket index at which the scan
terminates, not a count. This is clearer now that partition-scoped
scans set it to the first bucket of the next partition rather than
the total number of buckets.
---
src/backend/lib/dshash.c | 44 +++++++++++++++++++++++++++++-----------
src/include/lib/dshash.h | 14 ++++++++++++-
2 files changed, 45 insertions(+), 13 deletions(-)
diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c
index 1999989c14f..01e67856f43 100644
--- a/src/backend/lib/dshash.c
+++ b/src/backend/lib/dshash.c
@@ -52,15 +52,6 @@ struct dshash_table_item
/* The user's entry object follows here. See ENTRY_FROM_ITEM(item). */
};
-/*
- * The number of partitions for locking purposes. This is set to match
- * NUM_BUFFER_PARTITIONS for now, on the basis that whatever's good enough for
- * the buffer pool must be good enough for any other purpose. This could
- * become a runtime parameter in future.
- */
-#define DSHASH_NUM_PARTITIONS_LOG2 7
-#define DSHASH_NUM_PARTITIONS (1 << DSHASH_NUM_PARTITIONS_LOG2)
-
/* A magic value used to identify our hash tables. */
#define DSHASH_MAGIC 0x75ff6a20
@@ -661,13 +652,42 @@ dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
{
status->hash_table = hash_table;
status->curbucket = 0;
- status->nbuckets = 0;
+ status->endbucket = 0;
status->curitem = NULL;
status->pnextitem = InvalidDsaPointer;
status->curpartition = -1;
status->exclusive = exclusive;
}
+/*
+ * Initialize a sequential scan restricted to a single partition.
+ *
+ * Only entries in the specified partition are visited. The caller must
+ * ensure that 0 <= partition < DSHASH_NUM_PARTITIONS.
+ */
+void
+dshash_seq_init_partition(dshash_seq_status *status, dshash_table *hash_table,
+ bool exclusive, int partition)
+{
+ Assert(partition >= 0 && partition < DSHASH_NUM_PARTITIONS);
+
+ status->hash_table = hash_table;
+ status->curitem = NULL;
+ status->pnextitem = InvalidDsaPointer;
+ status->exclusive = exclusive;
+
+ LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+ exclusive ? LW_EXCLUSIVE : LW_SHARED);
+ ensure_valid_bucket_pointers(hash_table);
+
+ status->curpartition = partition;
+ status->endbucket =
+ BUCKET_INDEX_FOR_PARTITION(partition + 1, hash_table->size_log2);
+ /* Set to one before first bucket as the seq scan will ++curbucket */
+ status->curbucket =
+ BUCKET_INDEX_FOR_PARTITION(partition, hash_table->size_log2) - 1;
+}
+
/*
* Returns the next element.
*
@@ -701,7 +721,7 @@ dshash_seq_next(dshash_seq_status *status)
ensure_valid_bucket_pointers(status->hash_table);
- status->nbuckets =
+ status->endbucket =
NUM_BUCKETS(status->hash_table->control->size_log2);
next_item_pointer = status->hash_table->buckets[status->curbucket];
}
@@ -717,7 +737,7 @@ dshash_seq_next(dshash_seq_status *status)
{
int next_partition;
- if (++status->curbucket >= status->nbuckets)
+ if (++status->curbucket >= status->endbucket)
{
/* all buckets have been scanned. finish. */
return NULL;
diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h
index 64b758b381b..655b025b996 100644
--- a/src/include/lib/dshash.h
+++ b/src/include/lib/dshash.h
@@ -73,7 +73,7 @@ typedef struct dshash_seq_status
{
dshash_table *hash_table; /* dshash table working on */
int curbucket; /* bucket number we are at */
- int nbuckets; /* total number of buckets in the dshash */
+ int endbucket; /* first bucket beyond scan range */
dshash_table_item *curitem; /* item we are currently at */
dsa_pointer pnextitem; /* dsa-pointer to the next item */
int curpartition; /* partition number we are at */
@@ -109,9 +109,21 @@ extern void dshash_release_lock(dshash_table *hash_table, void *entry);
#define dshash_find_or_insert(hash_table, key, found) \
dshash_find_or_insert_extended(hash_table, key, found, 0)
+/*
+ * The number of partitions for locking purposes. This is set to match
+ * NUM_BUFFER_PARTITIONS for now, on the basis that whatever's good enough for
+ * the buffer pool must be good enough for any other purpose. This could
+ * become a runtime parameter in future.
+ */
+#define DSHASH_NUM_PARTITIONS_LOG2 7
+#define DSHASH_NUM_PARTITIONS (1 << DSHASH_NUM_PARTITIONS_LOG2)
+
/* seq scan support */
extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table,
bool exclusive);
+extern void dshash_seq_init_partition(dshash_seq_status *status,
+ dshash_table *hash_table,
+ bool exclusive, int partition);
extern void *dshash_seq_next(dshash_seq_status *status);
extern void dshash_seq_term(dshash_seq_status *status);
extern void dshash_delete_current(dshash_seq_status *status);
--
2.50.1 (Apple Git-155)
[application/x-patch] v3-0005-pg_stat_statements-store-query-text-in-DSA-instea.patch (37.0K, 10-v3-0005-pg_stat_statements-store-query-text-in-DSA-instea.patch)
download | inline diff:
From 48cc1cff5b38dda738c07099aa325b833d5ec1c3 Mon Sep 17 00:00:00 2001
From: Sami Imseih <[email protected]>
Date: Fri, 29 May 2026 09:20:29 -0500
Subject: [PATCH v3 5/5] pg_stat_statements: store query text in DSA instead of
file
Replace the file-based query text storage (pgss_query_texts.stat) with
a DSA area obtained via GetNamedDSA. Each dshash entry now stores a
dsa_pointer to its query text instead of a file offset, eliminating
external file I/O and the associated garbage collection machinery.
Add a new GUC pg_stat_statements.query_text_memory (PGC_SIGHUP,
default 4MB) that controls the DSA size limit. When the DSA is
exhausted, new entries are still tracked with full counters but their
query text is stored as NULL.
Add backfill logic: when an existing entry with NULL text is accessed
again (e.g. after a limit increase + reload), the text is written into
the now-available DSA space, recovering from earlier exhaustion without
requiring a reset.
Add query_text_size column to pg_stat_statements_info showing current
DSA total size, helping operators monitor text memory usage.
Add 002_query_text_memory.pl TAP test that exercises DSA exhaustion,
verifies entries still track counters with NULL text, and confirms
backfill works after raising the limit.
Rename 010_restart.pl to 001_restart.pl for naming consistency.
---
contrib/pg_stat_statements/meson.build | 1 +
.../pg_stat_statements--1.13--1.14.sql | 20 +
.../pg_stat_statements/pg_stat_statements.c | 695 ++++--------------
.../t/{010_restart.pl => 001_restart.pl} | 0
.../t/002_query_text_memory.pl | 124 ++++
5 files changed, 285 insertions(+), 555 deletions(-)
rename contrib/pg_stat_statements/t/{010_restart.pl => 001_restart.pl} (100%)
create mode 100644 contrib/pg_stat_statements/t/002_query_text_memory.pl
diff --git a/contrib/pg_stat_statements/meson.build b/contrib/pg_stat_statements/meson.build
index 7ffc8964494..a3920669541 100644
--- a/contrib/pg_stat_statements/meson.build
+++ b/contrib/pg_stat_statements/meson.build
@@ -71,6 +71,7 @@ tests += {
'tap': {
'tests': [
't/001_restart.pl',
+ 't/002_query_text_memory.pl',
],
},
}
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
index 7ed4c19eb5a..42c41823840 100644
--- a/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
@@ -79,3 +79,23 @@ GRANT SELECT ON pg_stat_statements TO PUBLIC;
/* Mark reset functions as PARALLEL RESTRICTED */
ALTER FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) PARALLEL RESTRICTED;
+
+/* Recreate pg_stat_statements_info with query_text_size column */
+ALTER EXTENSION pg_stat_statements DROP VIEW pg_stat_statements_info;
+ALTER EXTENSION pg_stat_statements DROP FUNCTION pg_stat_statements_info();
+DROP VIEW pg_stat_statements_info;
+DROP FUNCTION pg_stat_statements_info();
+
+CREATE FUNCTION pg_stat_statements_info(
+ OUT dealloc bigint,
+ OUT stats_reset timestamp with time zone,
+ OUT query_text_size bigint
+)
+RETURNS record
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT VOLATILE PARALLEL SAFE;
+
+CREATE VIEW pg_stat_statements_info AS
+ SELECT * FROM pg_stat_statements_info();
+
+GRANT SELECT ON pg_stat_statements_info TO PUBLIC;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 2004cad91f7..c5fd8a08db4 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -5,9 +5,11 @@
* usage across a whole database cluster.
*
* Execution costs are totaled for each distinct source query, and kept in
- * a dshash table registered via DSM registry. (We attempt to keep no more
- * distinct queries than the configured limit, but because dynamic shared
- * memory is used, the count may briefly exceed it.)
+ * a dshash table registered via DSM registry. Query text is stored in a
+ * separate DSA area (also via DSM registry), eliminating the need for
+ * external file I/O. (We attempt to keep no more distinct queries than
+ * the configured limit, but because dynamic shared memory is used, the
+ * count may briefly exceed it.)
*
* Starting in Postgres 9.2, this module normalized query entries. As of
* Postgres 14, the normalization is done by the core if compute_query_id is
@@ -15,13 +17,10 @@
*
* To facilitate presenting entries to users, we create "representative" query
* strings in which constants are replaced with parameter symbols ($n), to
- * make it clearer what a normalized entry can represent. To avoid having
- * to truncate oversized query strings, we store these strings in a temporary
- * external query-texts file. Offsets into this file are kept in the dshash
- * entries.
+ * make it clearer what a normalized entry can represent.
*
* The dshash serves as the source-of-truth registry of tracked queries,
- * storing the key, file offsets to the query text, and an atomic refcount
+ * storing the key, a DSA pointer to the query text, and an atomic refcount
* used for clock-sweep eviction. Actual counters (calls, timing, buffers,
* etc.) are maintained via a custom pgstat kind using the pending/flush
* infrastructure, so backends accumulate stats locally and flush them
@@ -55,8 +54,6 @@
#include "postgres.h"
#include <math.h>
-#include <sys/stat.h>
-#include <unistd.h>
#include "access/htup_details.h"
#include "access/parallel.h"
@@ -73,12 +70,10 @@
#include "optimizer/planner.h"
#include "parser/analyze.h"
#include "storage/dsm_registry.h"
-#include "storage/fd.h"
-#include "storage/ipc.h"
-#include "storage/spin.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
+#include "utils/dsa.h"
#include "utils/guc.h"
#include "utils/numeric.h"
#include "utils/pgstat_internal.h"
@@ -90,11 +85,6 @@ PG_MODULE_MAGIC_EXT(
.version = PG_VERSION
);
-/*
- * Location of external query text file.
- */
-#define PGSS_TEXT_FILE PG_STAT_TMP_DIR "/pgss_query_texts.stat"
-
/* Custom pgstat kind ID */
#define PGSTAT_KIND_PGSS 25
@@ -225,10 +215,6 @@ typedef struct PgStatShared_Pgss
/*
* Entry in the dshash source-of-truth registry.
*
- * Note: in event of a failure in garbage collection of the query text file,
- * we reset query_offset to zero and query_len to -1. This will be seen as
- * an invalid state by qtext_fetch().
- *
* Note: Key must be first for dshash.
*/
typedef struct pgssEntry
@@ -236,7 +222,7 @@ typedef struct pgssEntry
pgssHashKey key;
pg_atomic_uint32 refcount; /* clock-sweep: decremented on sweep, evict at
* 0 */
- Size query_offset; /* query text offset in external file */
+ dsa_pointer query_text; /* DSA pointer to query text */
int query_len; /* # of valid bytes in query string, or -1 */
int encoding; /* query text encoding */
TimestampTz stats_since; /* timestamp of entry allocation */
@@ -253,10 +239,6 @@ typedef struct pgssSharedState
pg_atomic_uint32 sweep_partition; /* rotating hand: next partition to
* sweep */
TimestampTz stats_reset; /* timestamp with all stats reset */
- slock_t mutex; /* protects following fields only: */
- Size extent; /* current extent of query file */
- int n_writers; /* number of active writers to query file */
- int gc_count; /* query file garbage collection cycle count */
} pgssSharedState;
/* Backend-local pending entry */
@@ -284,6 +266,9 @@ static const dshash_parameters pgss_dsh_params = {
/* Global shared state */
static pgssSharedState *pgss_shared = NULL;
+/* Query text dsa area */
+static dsa_area *pgss_qtext_dsa = NULL;
+
/* source-of-truth dshash table */
static dshash_table *pgss_hash = NULL;
@@ -322,19 +307,13 @@ static bool pgss_track_utility = true; /* whether to track utility commands */
static bool pgss_track_planning = false; /* whether to track planning
* duration */
static bool pgss_save = true; /* whether to save stats across shutdown */
+static int pgss_query_text_memory = 4096; /* in KB */
#define pgss_enabled(level) \
(!IsParallelWorker() && \
(pgss_track == PGSS_TRACK_ALL || \
(pgss_track == PGSS_TRACK_TOP && (level) == 0)))
-#define record_gc_qtexts() \
- do { \
- SpinLockAcquire(&pgss_shared->mutex); \
- pgss_shared->gc_count++; \
- SpinLockRelease(&pgss_shared->mutex); \
- } while(0)
-
/*---- Function declarations ----*/
PG_FUNCTION_INFO_V1(pg_stat_statements_reset);
@@ -382,16 +361,9 @@ static void pgss_store(const char *query, int64 queryId,
PlannedStmtOrigin planOrigin);
static void entry_dealloc(void);
+static void pgss_assign_query_text_memory(int newval, void *extra);
static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only);
static uint64 pgss_hash_key(pgssHashKey *key);
-
-static bool qtext_store(const char *query, int query_len,
- Size *query_offset, int *gc_count);
-static char *qtext_load_file(Size *buffer_size);
-static char *qtext_fetch(Size query_offset, int query_len,
- char *buffer, Size buffer_size);
-static bool need_gc_qtexts(void);
-static void gc_qtexts(void);
static char *generate_normalized_query(const JumbleState *jstate,
const char *query,
int query_loc, int *query_len_p);
@@ -407,7 +379,6 @@ static void pgss_to_serialized_data(const PgStat_HashKey *key,
static bool pgss_from_serialized_data(const PgStat_HashKey *key,
PgStatShared_Common *header,
FILE *statfile);
-static void pgss_finish(PgStat_StatsFileOp status);
/*--------------------------------------------------------------------------
* Custom pgstat kind definition
@@ -427,7 +398,6 @@ static const PgStat_KindInfo pgss_kind_info = {
.flush_pending_cb = pgss_flush_pending_cb,
.to_serialized_data = pgss_to_serialized_data,
.from_serialized_data = pgss_from_serialized_data,
- .finish = pgss_finish,
};
static uint64
@@ -446,10 +416,13 @@ pgss_init_shmem(void *ptr, void *arg)
pg_atomic_init_u64(&state->dealloc, 0);
pg_atomic_init_u32(&state->sweep_partition, 0);
state->stats_reset = GetCurrentTimestamp();
- SpinLockInit(&state->mutex);
- state->extent = 0;
- state->n_writers = 0;
- state->gc_count = 0;
+}
+
+static void
+pgss_assign_query_text_memory(int newval, void *extra)
+{
+ if (pgss_qtext_dsa)
+ dsa_set_size_limit(pgss_qtext_dsa, (size_t) newval * 1024);
}
static void
@@ -468,6 +441,12 @@ pgss_attach_shmem(void)
if (pgss_hash == NULL)
pgss_hash = GetNamedDSHash("pg_stat_statements", &pgss_dsh_params,
&found);
+
+ if (pgss_qtext_dsa == NULL)
+ {
+ pgss_qtext_dsa = GetNamedDSA("pg_stat_statements_qtext", &found);
+ dsa_set_size_limit(pgss_qtext_dsa, (size_t) pgss_query_text_memory * 1024);
+ }
}
/*
@@ -543,6 +522,19 @@ _PG_init(void)
NULL,
NULL);
+ DefineCustomIntVariable("pg_stat_statements.query_text_memory",
+ "Sets the memory limit for query text storage.",
+ NULL,
+ &pgss_query_text_memory,
+ 4096,
+ 256,
+ MAX_KILOBYTES,
+ PGC_SIGHUP,
+ GUC_UNIT_KB,
+ NULL,
+ pgss_assign_query_text_memory,
+ NULL);
+
MarkGUCPrefixReserved("pg_stat_statements");
/*
@@ -678,9 +670,6 @@ pgss_to_serialized_data(const PgStat_HashKey *key,
const PgStatShared_Common *header,
FILE *statfile)
{
- static char *qbuffer = NULL;
- static Size qbuffer_size = 0;
-
PgStatShared_Pgss *shpgss = (PgStatShared_Pgss *) header;
pgssEntry *entry;
bool found = false;
@@ -713,13 +702,9 @@ pgss_to_serialized_data(const PgStat_HashKey *key,
pgstat_write_chunk_s(statfile, &serialized);
- /* Load query text file once, reuse across all entries */
- if (!qbuffer)
- qbuffer = qtext_load_file(&qbuffer_size);
-
- if (serialized.query_len >= 0 && qbuffer)
- qtext = qtext_fetch(serialized.query_offset, serialized.query_len,
- qbuffer, qbuffer_size);
+ /* Load query text */
+ if (DsaPointerIsValid(serialized.query_text) && serialized.query_len >= 0)
+ qtext = dsa_get_address(pgss_qtext_dsa, serialized.query_text);
if (qtext)
{
@@ -736,7 +721,7 @@ pgss_to_serialized_data(const PgStat_HashKey *key,
/*
* Deserialize auxiliary data: recreate the dshash entry and store query
- * text in the file.
+ * text in pgss_qtext_dsa.
*/
static bool
pgss_from_serialized_data(const PgStat_HashKey *key,
@@ -777,7 +762,7 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
if (qtext_len >= 0)
{
char *qtext;
- Size query_offset;
+ dsa_pointer dp;
qtext = palloc(qtext_len + 1);
if (!pgstat_read_chunk(statfile, qtext, qtext_len + 1))
@@ -787,15 +772,16 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
return false;
}
- /* Store in the text file */
- if (qtext_store(qtext, qtext_len, &query_offset, NULL))
+ dp = dsa_allocate_extended(pgss_qtext_dsa, qtext_len + 1, DSA_ALLOC_NO_OOM);
+ if (DsaPointerIsValid(dp))
{
- dsh_entry->query_offset = query_offset;
+ memcpy(dsa_get_address(pgss_qtext_dsa, dp), qtext, qtext_len + 1);
+ dsh_entry->query_text = dp;
dsh_entry->query_len = qtext_len;
}
else
{
- dsh_entry->query_offset = 0;
+ dsh_entry->query_text = InvalidDsaPointer;
dsh_entry->query_len = -1;
}
@@ -803,7 +789,7 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
}
else
{
- dsh_entry->query_offset = 0;
+ dsh_entry->query_text = InvalidDsaPointer;
dsh_entry->query_len = -1;
}
@@ -826,31 +812,6 @@ pgss_from_serialized_data(const PgStat_HashKey *key,
return true;
}
-static void
-pgss_finish(PgStat_StatsFileOp status)
-{
- switch (status)
- {
- case STATS_WRITE:
-
- /*
- * Text has been serialized into the pgstat file; remove the
- * original.
- */
- unlink(PGSS_TEXT_FILE);
- break;
-
- case STATS_READ:
- /* Text file was rebuilt by from_serialized_data; keep it. */
- break;
-
- case STATS_DISCARD:
- /* Stats discarded; remove orphaned text file. */
- unlink(PGSS_TEXT_FILE);
- break;
- }
-}
-
/*--------------------------------------------------------------------------
* pgss_store: Record statistics for one statement execution.
*
@@ -912,7 +873,60 @@ pgss_store(const char *query, int64 queryId,
if (cur < PGSS_REF_CAP)
pg_atomic_compare_exchange_u32(&dsh_entry->refcount, &cur, cur + 1);
- dshash_release_lock(pgss_hash, dsh_entry);
+ /*
+ * If text was lost due to DSA exhaustion, try to backfill now that
+ * more memory may be available (e.g. after a limit increase +
+ * reload).
+ */
+ if (!DsaPointerIsValid(dsh_entry->query_text))
+ {
+ dsa_pointer dp;
+ const char *store_query;
+ int store_len = query_len;
+
+ /*
+ * Upgrade to exclusive lock for backfill. Between release and
+ * re-acquire the entry could be evicted and re-inserted; the
+ * re-check below handles that harmlessly.
+ *
+ * When jstate is available (i.e. called from post_parse_analyze),
+ * we normalize. When called from executor/utility hooks, jstate
+ * is NULL so we store the raw query text.
+ */
+ if (jstate && jstate->clocations_count > 0)
+ norm_query = generate_normalized_query(jstate, query,
+ query_location,
+ &store_len);
+ store_query = norm_query ? norm_query : query;
+
+ dshash_release_lock(pgss_hash, dsh_entry);
+ dsh_entry = dshash_find(pgss_hash, &key, true);
+ if (dsh_entry && !DsaPointerIsValid(dsh_entry->query_text))
+ {
+ dp = dsa_allocate_extended(pgss_qtext_dsa, store_len + 1,
+ DSA_ALLOC_NO_OOM);
+ if (DsaPointerIsValid(dp))
+ {
+ char *dst = dsa_get_address(pgss_qtext_dsa, dp);
+
+ memcpy(dst, store_query, store_len);
+ dst[store_len] = '\0';
+ dsh_entry->query_text = dp;
+ dsh_entry->query_len = store_len;
+ dsh_entry->encoding = encoding;
+ }
+ }
+ if (dsh_entry)
+ dshash_release_lock(pgss_hash, dsh_entry);
+
+ if (norm_query)
+ {
+ pfree(norm_query);
+ norm_query = NULL;
+ }
+ }
+ else
+ dshash_release_lock(pgss_hash, dsh_entry);
/* If jstate is provided and kind is PGSS_INVALID, nothing more to do */
if (jstate && kind == PGSS_INVALID)
@@ -920,45 +934,39 @@ pgss_store(const char *query, int64 queryId,
}
else
{
- Size query_offset;
- int gc_count;
- bool stored;
- bool do_gc;
-
/* Slow path: insert with exclusive lock */
- if (jstate)
- {
- norm_query = generate_normalized_query(jstate, query,
- query_location,
- &query_len);
- }
-
- /* Store query text in the file */
- stored = qtext_store(norm_query ? norm_query : query, query_len,
- &query_offset, &gc_count);
-
- /*
- * Determine whether we need to garbage collect external query texts.
- */
- do_gc = need_gc_qtexts();
-
dsh_entry = dshash_find_or_insert(pgss_hash, &key, &found);
if (!found)
{
+ dsa_pointer dp;
+
pg_atomic_init_u32(&dsh_entry->refcount, 1);
dsh_entry->stats_since = GetCurrentTimestamp();
dsh_entry->minmax_stats_since = dsh_entry->stats_since;
- dsh_entry->encoding = encoding;
- if (stored)
+ if (jstate)
+ {
+ norm_query = generate_normalized_query(jstate, query,
+ query_location,
+ &query_len);
+ }
+
+ dp = dsa_allocate_extended(pgss_qtext_dsa, query_len + 1, DSA_ALLOC_NO_OOM);
+ if (DsaPointerIsValid(dp))
{
- dsh_entry->query_offset = query_offset;
+ char *dst = dsa_get_address(pgss_qtext_dsa, dp);
+
+ memcpy(dst, norm_query ? norm_query : query, query_len);
+ dst[query_len] = '\0';
+ dsh_entry->query_text = dp;
dsh_entry->query_len = query_len;
+ dsh_entry->encoding = encoding;
}
else
{
- dsh_entry->query_offset = 0;
+ dsh_entry->query_text = InvalidDsaPointer;
dsh_entry->query_len = -1;
+ dsh_entry->encoding = encoding;
}
dshash_release_lock(pgss_hash, dsh_entry);
@@ -966,10 +974,6 @@ pgss_store(const char *query, int64 queryId,
pg_atomic_fetch_add_u64(&pgss_shared->nentries, 1);
entry_dealloc();
- /* If needed, perform garbage collection */
- if (do_gc)
- gc_qtexts();
-
if (jstate && kind == PGSS_INVALID)
{
if (norm_query)
@@ -987,11 +991,7 @@ pgss_store(const char *query, int64 queryId,
dshash_release_lock(pgss_hash, dsh_entry);
if (jstate && kind == PGSS_INVALID)
- {
- if (norm_query)
- pfree(norm_query);
return;
- }
}
}
@@ -1594,8 +1594,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
pgssEntry *entry;
Oid userid = GetUserId();
bool is_allowed_role;
- char *qbuffer = NULL;
- Size qbuffer_size = 0;
is_allowed_role = has_privs_of_role(userid, ROLE_PG_READ_ALL_STATS);
@@ -1659,10 +1657,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
elog(ERROR, "incorrect number of output arguments");
}
- /* Load the query text file */
- if (showtext)
- qbuffer = qtext_load_file(&qbuffer_size);
-
dshash_seq_init(&status, pgss_hash, false);
while ((entry = dshash_seq_next(&status)) != NULL)
{
@@ -1704,13 +1698,9 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
if (showtext)
{
- char *qstr = qtext_fetch(entry->query_offset,
- entry->query_len,
- qbuffer,
- qbuffer_size);
-
- if (qstr)
+ if (DsaPointerIsValid(entry->query_text) && entry->query_len >= 0)
{
+ char *qstr = dsa_get_address(pgss_qtext_dsa, entry->query_text);
char *enc = pg_any_to_server(qstr, entry->query_len, entry->encoding);
values[i++] = CStringGetTextDatum(enc);
@@ -1850,13 +1840,10 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
dshash_seq_term(&status);
-
- if (qbuffer)
- pfree(qbuffer);
}
/* Number of output arguments (columns) for pg_stat_statements_info */
-#define PG_STAT_STATEMENTS_INFO_COLS 2
+#define PG_STAT_STATEMENTS_INFO_COLS 3
/*
* Return statistics of pg_stat_statements.
@@ -1875,6 +1862,7 @@ pg_stat_statements_info(PG_FUNCTION_ARGS)
values[0] = Int64GetDatum((int64) pg_atomic_read_u64(&pgss_shared->dealloc));
values[1] = TimestampTzGetDatum(pgss_shared->stats_reset);
+ values[2] = Int64GetDatum((int64) dsa_get_total_size(pgss_qtext_dsa));
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
@@ -1951,6 +1939,10 @@ entry_dealloc(void)
continue;
}
+ /* Free the DSA query text before evicting */
+ if (DsaPointerIsValid(entry->query_text))
+ dsa_free(pgss_qtext_dsa, entry->query_text);
+
/* Evict: drop from pgstat, then delete from dshash */
pgstat_drop_entry(PGSTAT_KIND_PGSS, entry->key.dbid,
pgss_hash_key(&entry->key), true);
@@ -2059,6 +2051,8 @@ entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
{
uint64 objid = pgss_hash_key(&key);
+ if (DsaPointerIsValid(entry->query_text))
+ dsa_free(pgss_qtext_dsa, entry->query_text);
dshash_delete_entry(pgss_hash, entry);
pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
num_remove++;
@@ -2073,6 +2067,8 @@ entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
{
uint64 objid = pgss_hash_key(&key);
+ if (DsaPointerIsValid(entry->query_text))
+ dsa_free(pgss_qtext_dsa, entry->query_text);
dshash_delete_entry(pgss_hash, entry);
pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
num_remove++;
@@ -2094,6 +2090,9 @@ entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
pgssHashKey ekey = entry->key;
uint64 objid = pgss_hash_key(&ekey);
+ if (DsaPointerIsValid(entry->query_text))
+ dsa_free(pgss_qtext_dsa, entry->query_text);
+
dshash_delete_current(&status);
pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
num_remove++;
@@ -2105,430 +2104,16 @@ entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
dshash_seq_term(&status);
}
- /* If all entries were removed, reset global statistics and text file */
+ /* If all entries were removed, reset global statistics */
if (num_remove > 0 && num_entries == num_remove)
{
- FILE *qfile;
-
pg_atomic_write_u64(&pgss_shared->dealloc, 0);
pgss_shared->stats_reset = stats_reset;
-
- /* Write new empty query file */
- qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
- if (qfile == NULL)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not create file \"%s\": %m",
- PGSS_TEXT_FILE)));
- }
- else
- {
- if (ftruncate(fileno(qfile), 0) != 0)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not truncate file \"%s\": %m",
- PGSS_TEXT_FILE)));
- FreeFile(qfile);
- }
-
- SpinLockAcquire(&pgss_shared->mutex);
- pgss_shared->extent = 0;
- SpinLockRelease(&pgss_shared->mutex);
- record_gc_qtexts();
}
return stats_reset;
}
-/*--------------------------------------------------------------------------
- * Query text file management
- *--------------------------------------------------------------------------
- */
-
-/*
- * Store query text in the external file.
- *
- * If successful, returns true, and stores the new entry's offset in the file
- * into *query_offset. Also, if gc_count isn't NULL, *gc_count is set to the
- * number of garbage collections that have occurred so far.
- *
- * On failure, returns false.
- */
-static bool
-qtext_store(const char *query, int query_len,
- Size *query_offset, int *gc_count)
-{
- Size off;
- int fd;
-
- /*
- * We use a spinlock to protect extent/n_writers/gc_count, so that
- * multiple processes may execute this function concurrently.
- */
- SpinLockAcquire(&pgss_shared->mutex);
- off = pgss_shared->extent;
- pgss_shared->extent += query_len + 1;
- pgss_shared->n_writers++;
- if (gc_count)
- *gc_count = pgss_shared->gc_count;
- SpinLockRelease(&pgss_shared->mutex);
-
- *query_offset = off;
-
- /*
- * Don't allow the file to grow larger than what qtext_load_file can
- * (theoretically) handle. This has been seen to be reachable on 32-bit
- * platforms.
- */
- if (unlikely(query_len >= MaxAllocHugeSize - off))
- {
- errno = EFBIG;
- fd = -1;
- goto error;
- }
-
- /* Now write the data into the successfully-reserved part of the file */
- fd = OpenTransientFile(PGSS_TEXT_FILE, O_RDWR | O_CREAT | PG_BINARY);
- if (fd < 0)
- goto error;
-
- if (pg_pwrite(fd, query, query_len, off) != query_len)
- goto error;
- if (pg_pwrite(fd, "\0", 1, off + query_len) != 1)
- goto error;
-
- CloseTransientFile(fd);
-
- /* Mark our write complete */
- SpinLockAcquire(&pgss_shared->mutex);
- pgss_shared->n_writers--;
- SpinLockRelease(&pgss_shared->mutex);
-
- return true;
-
-error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_TEXT_FILE)));
-
- if (fd >= 0)
- CloseTransientFile(fd);
-
- /* Mark our write complete */
- SpinLockAcquire(&pgss_shared->mutex);
- pgss_shared->n_writers--;
- SpinLockRelease(&pgss_shared->mutex);
-
- return false;
-}
-
-/*
- * Read the external query text file into a palloc'd buffer.
- *
- * Returns NULL (without throwing an error) if unable to read, eg
- * file not there or insufficient memory.
- *
- * On success, the buffer size is also returned into *buffer_size.
- */
-static char *
-qtext_load_file(Size *buffer_size)
-{
- char *buf;
- int fd;
- struct stat stat;
- Size nread;
-
- fd = OpenTransientFile(PGSS_TEXT_FILE, O_RDONLY | PG_BINARY);
- if (fd < 0)
- {
- if (errno != ENOENT)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m",
- PGSS_TEXT_FILE)));
- return NULL;
- }
-
- /* Get file length */
- if (fstat(fd, &stat))
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- PGSS_TEXT_FILE)));
- CloseTransientFile(fd);
- return NULL;
- }
-
- /* Allocate buffer; beware that off_t might be wider than size_t */
- if (stat.st_size <= MaxAllocHugeSize)
- buf = (char *) palloc_extended(stat.st_size, MCXT_ALLOC_HUGE | MCXT_ALLOC_NO_OOM);
- else
- buf = NULL;
- if (buf == NULL)
- {
- ereport(LOG,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory"),
- errdetail("Could not allocate enough memory to read file \"%s\".",
- PGSS_TEXT_FILE)));
- CloseTransientFile(fd);
- return NULL;
- }
-
- /*
- * OK, slurp in the file. Read in 1GB segments to avoid issues on some
- * platforms.
- */
- nread = 0;
- while (nread < stat.st_size)
- {
- int toread = Min(1024 * 1024 * 1024, stat.st_size - nread);
-
- errno = 0;
- if (read(fd, buf + nread, toread) != toread)
- {
- if (errno)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m",
- PGSS_TEXT_FILE)));
- pfree(buf);
- CloseTransientFile(fd);
- return NULL;
- }
- nread += toread;
- }
-
- if (CloseTransientFile(fd) != 0)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not close file \"%s\": %m", PGSS_TEXT_FILE)));
-
- *buffer_size = nread;
- return buf;
-}
-
-/*
- * Locate a query text in the file image previously read by qtext_load_file().
- *
- * We validate the given offset/length, and return NULL if bogus. Otherwise,
- * the result points to a null-terminated string within the buffer.
- */
-static char *
-qtext_fetch(Size query_offset, int query_len,
- char *buffer, Size buffer_size)
-{
- /* File read failed? */
- if (buffer == NULL)
- return NULL;
- /* Bogus offset/length? */
- if (query_len < 0 ||
- query_offset + query_len >= buffer_size)
- return NULL;
- /* As a further sanity check, make sure there's a trailing null */
- if (buffer[query_offset + query_len] != '\0')
- return NULL;
- /* Looks OK */
- return buffer + query_offset;
-}
-
-/*
- * Do we need to garbage-collect the external query text file?
- *
- * We check whether the file has grown excessively relative to the number
- * of entries tracked.
- */
-static bool
-need_gc_qtexts(void)
-{
- Size extent;
- int nentries;
-
- /* Read shared extent pointer */
- SpinLockAcquire(&pgss_shared->mutex);
- extent = pgss_shared->extent;
- SpinLockRelease(&pgss_shared->mutex);
-
- nentries = (int) pg_atomic_read_u64(&pgss_shared->nentries);
-
- /*
- * Don't proceed if file does not exceed 512 bytes per possible entry.
- */
- if ((uint64) extent < (uint64) 512 * pgss_max)
- return false;
-
- /*
- * Don't proceed if file is less than about 50% bloat. We estimate mean
- * query length from the file size and entry count.
- */
- if (nentries > 0)
- {
- Size mean_query_len = extent / nentries;
-
- if ((uint64) extent < (uint64) mean_query_len * pgss_max * 2)
- return false;
- }
-
- return true;
-}
-
-/*
- * Garbage-collect orphaned query texts in external file.
- *
- * This rewrites the query text file, keeping only texts referenced by
- * current dshash entries, and updates their offsets accordingly.
- *
- * Note: unlike the upstream implementation which required LWLock exclusive,
- * this uses dshash partition-level exclusive locks via sequential scan with
- * exclusive=true. This is safe because we're updating entry offsets in-place
- * and the file is guaranteed not to grow during GC (no new writers can get
- * offsets into the old region after we've reset the extent).
- */
-static void
-gc_qtexts(void)
-{
- char *qbuffer;
- Size qbuffer_size;
- FILE *qfile = NULL;
- dshash_seq_status status;
- pgssEntry *entry;
- Size extent;
-
- if (!need_gc_qtexts())
- return;
-
- /*
- * Load the old texts file. If we fail (out of memory, for instance),
- * invalidate query texts.
- */
- qbuffer = qtext_load_file(&qbuffer_size);
- if (qbuffer == NULL)
- goto gc_fail;
-
- /*
- * We overwrite the query texts file in place, so as to reduce the risk of
- * an out-of-disk-space failure.
- */
- qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
- if (qfile == NULL)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_TEXT_FILE)));
- goto gc_fail;
- }
-
- extent = 0;
-
- dshash_seq_init(&status, pgss_hash, true);
- while ((entry = dshash_seq_next(&status)) != NULL)
- {
- int query_len = entry->query_len;
- char *qry = qtext_fetch(entry->query_offset,
- query_len,
- qbuffer,
- qbuffer_size);
-
- if (qry == NULL)
- {
- /* Trouble ... drop the text */
- entry->query_offset = 0;
- entry->query_len = -1;
- continue;
- }
-
- if (fwrite(qry, 1, query_len + 1, qfile) != query_len + 1)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_TEXT_FILE)));
- dshash_seq_term(&status);
- goto gc_fail;
- }
-
- entry->query_offset = extent;
- extent += query_len + 1;
- }
- dshash_seq_term(&status);
-
- /*
- * Truncate away any now-unused space.
- */
- if (ftruncate(fileno(qfile), extent) != 0)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not truncate file \"%s\": %m",
- PGSS_TEXT_FILE)));
-
- if (FreeFile(qfile))
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_TEXT_FILE)));
- qfile = NULL;
- goto gc_fail;
- }
-
- elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu",
- pgss_shared->extent, extent);
-
- /* Reset the shared extent pointer */
- SpinLockAcquire(&pgss_shared->mutex);
- pgss_shared->extent = extent;
- SpinLockRelease(&pgss_shared->mutex);
-
- pfree(qbuffer);
-
- record_gc_qtexts();
-
- return;
-
-gc_fail:
- /* clean up resources */
- if (qfile)
- FreeFile(qfile);
- if (qbuffer)
- pfree(qbuffer);
-
- /*
- * Since the contents of the external file are now uncertain, mark all
- * dshash entries as having invalid texts.
- */
- dshash_seq_init(&status, pgss_hash, true);
- while ((entry = dshash_seq_next(&status)) != NULL)
- {
- entry->query_offset = 0;
- entry->query_len = -1;
- }
- dshash_seq_term(&status);
-
- /*
- * Destroy the query text file and create a new, empty one
- */
- (void) unlink(PGSS_TEXT_FILE);
- qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
- if (qfile == NULL)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not recreate file \"%s\": %m",
- PGSS_TEXT_FILE)));
- else
- FreeFile(qfile);
-
- /* Reset the shared extent pointer */
- SpinLockAcquire(&pgss_shared->mutex);
- pgss_shared->extent = 0;
- SpinLockRelease(&pgss_shared->mutex);
-
- record_gc_qtexts();
-}
-
/*
* Generate a normalized version of the query string that will be used to
* represent all similar queries.
diff --git a/contrib/pg_stat_statements/t/010_restart.pl b/contrib/pg_stat_statements/t/001_restart.pl
similarity index 100%
rename from contrib/pg_stat_statements/t/010_restart.pl
rename to contrib/pg_stat_statements/t/001_restart.pl
diff --git a/contrib/pg_stat_statements/t/002_query_text_memory.pl b/contrib/pg_stat_statements/t/002_query_text_memory.pl
new file mode 100644
index 00000000000..50b00153625
--- /dev/null
+++ b/contrib/pg_stat_statements/t/002_query_text_memory.pl
@@ -0,0 +1,124 @@
+# Copyright (c) 2008-2026, PostgreSQL Global Development Group
+
+# Tests for pg_stat_statements.query_text_memory behavior.
+# Verifies that when the query text DSA is exhausted:
+# - entries are still tracked with counters accumulating
+# - query text is NULL for entries that could not store text
+# - both showtext=true and showtext=false return all entries
+# - after raising the limit and re-executing, text is backfilled
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->append_conf('postgresql.conf', qq{
+shared_preload_libraries = 'pg_stat_statements'
+pg_stat_statements.query_text_memory = 2MB
+});
+$node->start;
+
+$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements');
+$node->safe_psql('postgres', 'SELECT pg_stat_statements_reset()');
+
+my $mem = $node->safe_psql('postgres',
+ "SHOW pg_stat_statements.query_text_memory");
+is($mem, '2MB', 'query_text_memory is 2MB');
+
+# Generate unique queries to exhaust the text DSA (set to 2MB).
+# Each CTE query with 200 integer constants is ~1000 bytes of text.
+my $cols = join(', ', map { "$_" } (1 .. 200));
+my $sql = '';
+for my $i (1 .. 2500)
+{
+ $sql .= "WITH t${i} AS (SELECT $cols) SELECT FROM t${i};\n";
+}
+$node->safe_psql('postgres', $sql);
+
+my $null_count = $node->safe_psql('postgres', q{
+SELECT count(*)
+FROM pg_stat_statements
+WHERE query IS NULL AND queryid IS NOT NULL
+});
+
+diag("$null_count entries without text after 2500 queries");
+
+cmp_ok($null_count, '>', 0,
+ "some entries have NULL query text after DSA exhaustion ($null_count)");
+
+# Entries without text still have calls > 0
+my $tracked = $node->safe_psql('postgres', q{
+SELECT count(*)
+FROM pg_stat_statements
+WHERE query IS NULL AND queryid IS NOT NULL AND calls > 0
+});
+cmp_ok($tracked, '>', 0,
+ "entries without text still track counters ($tracked)");
+
+# Entries with text also exist (early entries got text before exhaustion)
+my $with_text = $node->safe_psql('postgres', q{
+SELECT count(*)
+FROM pg_stat_statements
+WHERE query IS NOT NULL AND query LIKE 'WITH t%'
+});
+cmp_ok($with_text, '>', 0,
+ "some entries still have query text ($with_text)");
+
+# Both showtext=true and showtext=false should return all entries
+my $count_true = $node->safe_psql('postgres', q{
+SELECT count(*) FROM pg_stat_statements(true)
+});
+my $count_false = $node->safe_psql('postgres', q{
+SELECT count(*) FROM pg_stat_statements(false)
+});
+cmp_ok($count_true, '>=', 2500,
+ "showtext=true returns all entries ($count_true)");
+cmp_ok($count_false, '>=', 2500,
+ "showtext=false returns all entries ($count_false)");
+
+# Run a probe query with constants while DSA is exhausted.
+# Since nentries < pg_stat_statements.max, entry_dealloc won't evict or
+# free any DSA space, so this entry should remain with NULL text.
+$node->safe_psql('postgres', 'SELECT 11111 + 22222 + 33333');
+
+my $probe_before = $node->safe_psql('postgres', q{
+SELECT count(*) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2 + $3'
+});
+is($probe_before, '0',
+ 'probe query text is NULL while DSA is exhausted');
+
+# Phase 2: Raise limit and verify backfill.
+$node->safe_psql('postgres',
+ "ALTER SYSTEM SET pg_stat_statements.query_text_memory = '100MB'");
+$node->safe_psql('postgres', "SELECT pg_reload_conf()");
+
+# Re-run the probe query in a new connection to trigger backfill.
+# Normalization is not guaranteed for all backfills (e.g. when triggered
+# from ExecutorEnd where jstate is unavailable), but when the backfill
+# occurs via post_parse_analyze, jstate is available and the text should
+# be stored in normalized form.
+$node->safe_psql('postgres', 'SELECT 11111 + 22222 + 33333');
+
+my $norm_after = $node->safe_psql('postgres', q{
+SELECT query FROM pg_stat_statements WHERE query = 'SELECT $1 + $2 + $3'
+});
+is($norm_after, 'SELECT $1 + $2 + $3',
+ 'backfilled text is stored in normalized form');
+
+# Also re-run the bulk queries to trigger their backfill
+$node->safe_psql('postgres', $sql);
+
+my $null_after = $node->safe_psql('postgres', q{
+SELECT count(*)
+FROM pg_stat_statements
+WHERE query IS NULL AND queryid IS NOT NULL
+});
+diag("after backfill: $null_after entries without text");
+cmp_ok($null_after, '<', $null_count,
+ "backfill reduced NULL text entries ($null_count -> $null_after)");
+
+$node->stop;
+done_testing();
--
2.50.1 (Apple Git-155)
[application/x-patch] v3-0004-pg_stat_statements-modernize-entry-storage-with-p.patch (117.1K, 11-v3-0004-pg_stat_statements-modernize-entry-storage-with-p.patch)
download | inline diff:
From 715935c78ba9f7cd16806df12e19cc7d605a3a4c Mon Sep 17 00:00:00 2001
From: Sami Imseih <[email protected]>
Date: Fri, 29 May 2026 09:20:13 -0500
Subject: [PATCH v3 4/5] pg_stat_statements: modernize entry storage with
pgstat kind
Replace the shared-memory hash table (ShmemInitHash) with a dshash
table registered via DSM registry, and replace per-entry spinlock
counter updates with a custom pgstat kind that uses the core pgstat
infrastructure.
Eviction switches from sort-based (qsort all entries by usage factor)
to clock-sweep with an atomic rotating hand. Each entry carries a
refcount (capped at PGSS_REF_CAP = 10) that is decremented on sweep;
entries reaching zero are evicted. Hot queries keep their refcount
topped up proportionally to access frequency. Eviction is guaranteed
to make forward progress.
The pg_stat_statements.max GUC is changed from PGC_POSTMASTER to
PGC_SIGHUP so the limit can be adjusted without a restart.
Query text storage remains file-based (pgss_query_texts.stat) as in
the original implementation, with the same qtext_store / qtext_load_file /
gc_qtexts infrastructure adapted to work with the dshash entries.
The extension is bumped to version 1.14 with PARALLEL RESTRICTED
marking on the main function, since parallel workers cannot flush
stats accumulated by the leader via the anytime API.
---
.../expected/oldextversions.out | 67 +
contrib/pg_stat_statements/meson.build | 3 +-
.../pg_stat_statements--1.13--1.14.sql | 81 +
.../pg_stat_statements/pg_stat_statements.c | 2359 ++++++++---------
.../pg_stat_statements.conf | 1 +
.../pg_stat_statements.control | 2 +-
.../pg_stat_statements/sql/oldextversions.sql | 5 +
doc/src/sgml/pgstatstatements.sgml | 39 +-
8 files changed, 1229 insertions(+), 1328 deletions(-)
create mode 100644 contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
diff --git a/contrib/pg_stat_statements/expected/oldextversions.out b/contrib/pg_stat_statements/expected/oldextversions.out
index 726383a99d7..5d65eb6b521 100644
--- a/contrib/pg_stat_statements/expected/oldextversions.out
+++ b/contrib/pg_stat_statements/expected/oldextversions.out
@@ -474,4 +474,71 @@ SELECT count(*) > 0 AS has_data FROM pg_stat_statements;
t
(1 row)
+-- Functions marked PARALLEL RESTRICTED in 1.14
+AlTER EXTENSION pg_stat_statements UPDATE TO '1.14';
+\d pg_stat_statements
+ View "public.pg_stat_statements"
+ Column | Type | Collation | Nullable | Default
+----------------------------+--------------------------+-----------+----------+---------
+ userid | oid | | |
+ dbid | oid | | |
+ toplevel | boolean | | |
+ queryid | bigint | | |
+ query | text | | |
+ plans | bigint | | |
+ total_plan_time | double precision | | |
+ min_plan_time | double precision | | |
+ max_plan_time | double precision | | |
+ mean_plan_time | double precision | | |
+ stddev_plan_time | double precision | | |
+ calls | bigint | | |
+ total_exec_time | double precision | | |
+ min_exec_time | double precision | | |
+ max_exec_time | double precision | | |
+ mean_exec_time | double precision | | |
+ stddev_exec_time | double precision | | |
+ rows | bigint | | |
+ shared_blks_hit | bigint | | |
+ shared_blks_read | bigint | | |
+ shared_blks_dirtied | bigint | | |
+ shared_blks_written | bigint | | |
+ local_blks_hit | bigint | | |
+ local_blks_read | bigint | | |
+ local_blks_dirtied | bigint | | |
+ local_blks_written | bigint | | |
+ temp_blks_read | bigint | | |
+ temp_blks_written | bigint | | |
+ shared_blk_read_time | double precision | | |
+ shared_blk_write_time | double precision | | |
+ local_blk_read_time | double precision | | |
+ local_blk_write_time | double precision | | |
+ temp_blk_read_time | double precision | | |
+ temp_blk_write_time | double precision | | |
+ wal_records | bigint | | |
+ wal_fpi | bigint | | |
+ wal_bytes | numeric | | |
+ wal_buffers_full | bigint | | |
+ jit_functions | bigint | | |
+ jit_generation_time | double precision | | |
+ jit_inlining_count | bigint | | |
+ jit_inlining_time | double precision | | |
+ jit_optimization_count | bigint | | |
+ jit_optimization_time | double precision | | |
+ jit_emission_count | bigint | | |
+ jit_emission_time | double precision | | |
+ jit_deform_count | bigint | | |
+ jit_deform_time | double precision | | |
+ parallel_workers_to_launch | bigint | | |
+ parallel_workers_launched | bigint | | |
+ generic_plan_calls | bigint | | |
+ custom_plan_calls | bigint | | |
+ stats_since | timestamp with time zone | | |
+ minmax_stats_since | timestamp with time zone | | |
+
+SELECT count(*) > 0 AS has_data FROM pg_stat_statements;
+ has_data
+----------
+ t
+(1 row)
+
DROP EXTENSION pg_stat_statements;
diff --git a/contrib/pg_stat_statements/meson.build b/contrib/pg_stat_statements/meson.build
index 9d78cb88b7d..7ffc8964494 100644
--- a/contrib/pg_stat_statements/meson.build
+++ b/contrib/pg_stat_statements/meson.build
@@ -21,6 +21,7 @@ contrib_targets += pg_stat_statements
install_data(
'pg_stat_statements.control',
'pg_stat_statements--1.4.sql',
+ 'pg_stat_statements--1.13--1.14.sql',
'pg_stat_statements--1.12--1.13.sql',
'pg_stat_statements--1.11--1.12.sql',
'pg_stat_statements--1.10--1.11.sql',
@@ -69,7 +70,7 @@ tests += {
},
'tap': {
'tests': [
- 't/010_restart.pl',
+ 't/001_restart.pl',
],
},
}
diff --git a/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
new file mode 100644
index 00000000000..7ed4c19eb5a
--- /dev/null
+++ b/contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql
@@ -0,0 +1,81 @@
+/* contrib/pg_stat_statements/pg_stat_statements--1.13--1.14.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION pg_stat_statements UPDATE TO '1.14'" to load this file. \quit
+
+/* First we have to remove them from the extension */
+ALTER EXTENSION pg_stat_statements DROP VIEW pg_stat_statements;
+ALTER EXTENSION pg_stat_statements DROP FUNCTION pg_stat_statements(boolean);
+
+/* Then we can drop them */
+DROP VIEW pg_stat_statements;
+DROP FUNCTION pg_stat_statements(boolean);
+
+/* Now redefine with PARALLEL RESTRICTED */
+CREATE FUNCTION pg_stat_statements(IN showtext boolean,
+ OUT userid oid,
+ OUT dbid oid,
+ OUT toplevel bool,
+ OUT queryid bigint,
+ OUT query text,
+ OUT plans int8,
+ OUT total_plan_time float8,
+ OUT min_plan_time float8,
+ OUT max_plan_time float8,
+ OUT mean_plan_time float8,
+ OUT stddev_plan_time float8,
+ OUT calls int8,
+ OUT total_exec_time float8,
+ OUT min_exec_time float8,
+ OUT max_exec_time float8,
+ OUT mean_exec_time float8,
+ OUT stddev_exec_time float8,
+ OUT rows int8,
+ OUT shared_blks_hit int8,
+ OUT shared_blks_read int8,
+ OUT shared_blks_dirtied int8,
+ OUT shared_blks_written int8,
+ OUT local_blks_hit int8,
+ OUT local_blks_read int8,
+ OUT local_blks_dirtied int8,
+ OUT local_blks_written int8,
+ OUT temp_blks_read int8,
+ OUT temp_blks_written int8,
+ OUT shared_blk_read_time float8,
+ OUT shared_blk_write_time float8,
+ OUT local_blk_read_time float8,
+ OUT local_blk_write_time float8,
+ OUT temp_blk_read_time float8,
+ OUT temp_blk_write_time float8,
+ OUT wal_records int8,
+ OUT wal_fpi int8,
+ OUT wal_bytes numeric,
+ OUT wal_buffers_full int8,
+ OUT jit_functions int8,
+ OUT jit_generation_time float8,
+ OUT jit_inlining_count int8,
+ OUT jit_inlining_time float8,
+ OUT jit_optimization_count int8,
+ OUT jit_optimization_time float8,
+ OUT jit_emission_count int8,
+ OUT jit_emission_time float8,
+ OUT jit_deform_count int8,
+ OUT jit_deform_time float8,
+ OUT parallel_workers_to_launch int8,
+ OUT parallel_workers_launched int8,
+ OUT generic_plan_calls int8,
+ OUT custom_plan_calls int8,
+ OUT stats_since timestamp with time zone,
+ OUT minmax_stats_since timestamp with time zone
+)
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'pg_stat_statements_1_13'
+LANGUAGE C STRICT VOLATILE PARALLEL RESTRICTED;
+
+CREATE VIEW pg_stat_statements AS
+ SELECT * FROM pg_stat_statements(true);
+
+GRANT SELECT ON pg_stat_statements TO PUBLIC;
+
+/* Mark reset functions as PARALLEL RESTRICTED */
+ALTER FUNCTION pg_stat_statements_reset(Oid, Oid, bigint, boolean) PARALLEL RESTRICTED;
diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c
index 92315627916..2004cad91f7 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.c
+++ b/contrib/pg_stat_statements/pg_stat_statements.c
@@ -5,8 +5,9 @@
* usage across a whole database cluster.
*
* Execution costs are totaled for each distinct source query, and kept in
- * a shared hashtable. (We track only as many distinct queries as will fit
- * in the designated amount of shared memory.)
+ * a dshash table registered via DSM registry. (We attempt to keep no more
+ * distinct queries than the configured limit, but because dynamic shared
+ * memory is used, the count may briefly exceed it.)
*
* Starting in Postgres 9.2, this module normalized query entries. As of
* Postgres 14, the normalization is done by the core if compute_query_id is
@@ -14,25 +15,35 @@
*
* To facilitate presenting entries to users, we create "representative" query
* strings in which constants are replaced with parameter symbols ($n), to
- * make it clearer what a normalized entry can represent. To save on shared
- * memory, and to avoid having to truncate oversized query strings, we store
- * these strings in a temporary external query-texts file. Offsets into this
- * file are kept in shared memory.
+ * make it clearer what a normalized entry can represent. To avoid having
+ * to truncate oversized query strings, we store these strings in a temporary
+ * external query-texts file. Offsets into this file are kept in the dshash
+ * entries.
*
- * Note about locking issues: to create or delete an entry in the shared
- * hashtable, one must hold pgss->lock exclusively. Modifying any field
- * in an entry except the counters requires the same. To look up an entry,
- * one must hold the lock shared. To read or update the counters within
- * an entry, one must hold the lock shared or exclusive (so the entry doesn't
- * disappear!) and also take the entry's mutex spinlock.
- * The shared state variable pgss->extent (the next free spot in the external
- * query-text file) should be accessed only while holding either the
- * pgss->mutex spinlock, or exclusive lock on pgss->lock. We use the mutex to
- * allow reserving file space while holding only shared lock on pgss->lock.
- * Rewriting the entire external query-text file, eg for garbage collection,
- * requires holding pgss->lock exclusively; this allows individual entries
- * in the file to be read or written while holding only shared lock.
+ * The dshash serves as the source-of-truth registry of tracked queries,
+ * storing the key, file offsets to the query text, and an atomic refcount
+ * used for clock-sweep eviction. Actual counters (calls, timing, buffers,
+ * etc.) are maintained via a custom pgstat kind using the pending/flush
+ * infrastructure, so backends accumulate stats locally and flush them
+ * without contending on shared state.
*
+ * Note about locking: to look up an existing entry, a backend takes a
+ * shared lock on the entry's dshash partition and bumps the refcount
+ * atomically. To insert a new entry, an exclusive partition lock is
+ * required but only on that single partition. When the entry count exceeds
+ * pg_stat_statements.max, a clock-sweep eviction pass claims a partition
+ * via an atomic rotating hand and sweeps it under exclusive lock, evicting
+ * entries whose refcount has reached zero.
+ *
+ * A backend that triggers eviction does not necessarily evict from the same
+ * partition it is inserting into. The point of the clock-sweep is to
+ * continually make progress on keeping the total entry count in check,
+ * spreading eviction work across all partitions over time. The refcount
+ * is capped at PGSS_REF_CAP, so any entry will reach zero after at most
+ * that many sweep passes regardless of how frequently it is accessed. No
+ * entry can become permanently immune to eviction. Furthermore, because
+ * the sweep holds the partition's exclusive lock, no backend can re-bump
+ * the refcount of an entry while it is being considered for eviction.
*
* Copyright (c) 2008-2026, PostgreSQL Global Development Group
*
@@ -50,24 +61,27 @@
#include "access/htup_details.h"
#include "access/parallel.h"
#include "catalog/pg_authid.h"
+#include "common/hashfn.h"
#include "executor/instrument.h"
#include "funcapi.h"
#include "jit/jit.h"
+#include "lib/dshash.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "nodes/queryjumble.h"
#include "optimizer/planner.h"
#include "parser/analyze.h"
-#include "pgstat.h"
+#include "storage/dsm_registry.h"
#include "storage/fd.h"
#include "storage/ipc.h"
-#include "storage/lwlock.h"
-#include "storage/shmem.h"
#include "storage/spin.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
-#include "utils/memutils.h"
+#include "utils/guc.h"
+#include "utils/numeric.h"
+#include "utils/pgstat_internal.h"
#include "utils/timestamp.h"
#include "utils/tuplestore.h"
@@ -76,29 +90,18 @@ PG_MODULE_MAGIC_EXT(
.version = PG_VERSION
);
-/* Location of permanent stats file (valid when database is shut down) */
-#define PGSS_DUMP_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat"
-
/*
* Location of external query text file.
*/
#define PGSS_TEXT_FILE PG_STAT_TMP_DIR "/pgss_query_texts.stat"
-/* Magic number identifying the stats file format */
-static const uint32 PGSS_FILE_HEADER = 0x20250731;
+/* Custom pgstat kind ID */
+#define PGSTAT_KIND_PGSS 25
-/* PostgreSQL major version number, changes in which invalidate all entries */
-static const uint32 PGSS_PG_MAJOR_VERSION = PG_VERSION_NUM / 100;
-
-/* XXX: Should USAGE_EXEC reflect execution time and/or buffer usage? */
-#define USAGE_EXEC(duration) (1.0)
-#define USAGE_INIT (1.0) /* including initial planning */
-#define ASSUMED_MEDIAN_INIT (10.0) /* initial assumed median usage */
-#define ASSUMED_LENGTH_INIT 1024 /* initial assumed mean query length */
-#define USAGE_DECREASE_FACTOR (0.99) /* decreased every entry_dealloc */
-#define STICKY_DECREASE_FACTOR (0.50) /* factor for sticky entries */
-#define USAGE_DEALLOC_PERCENT 5 /* free this % of entries at once */
-#define IS_STICKY(c) ((c.calls[PGSS_PLAN] + c.calls[PGSS_EXEC]) == 0)
+/* Clock-sweep settings */
+#define USAGE_DEALLOC_PERCENT 5 /* evict until total entries drop to this
+ * % below max */
+#define PGSS_REF_CAP 10 /* maximum refcount for clock-sweep */
/*
* Extension version number, for supporting older extension versions' objects
@@ -133,12 +136,9 @@ typedef enum pgssStoreKind
#define PGSS_NUMKIND (PGSS_EXEC + 1)
/*
- * Hashtable key that defines the identity of a hashtable entry. We separate
+ * Hashtable key that defines the identity of a tracked statement.
+ * This is used in the dshash (source-of-truth registry). We separate
* queries by user and by database even if they are otherwise identical.
- *
- * If you add a new key to this struct, make sure to teach pgss_store() to
- * zero the padding bytes. Otherwise, things will break, because pgss_hash is
- * created using HASH_BLOBS, and thus tag_hash is used to hash this.
*/
typedef struct pgssHashKey
{
@@ -149,9 +149,9 @@ typedef struct pgssHashKey
} pgssHashKey;
/*
- * The actual stats counters kept within pgssEntry.
+ * The actual stats counters kept within the custom pgstat kind.
*/
-typedef struct Counters
+typedef struct pgssCounters
{
int64 calls[PGSS_NUMKIND]; /* # of times planned/executed */
double total_time[PGSS_NUMKIND]; /* total planning/execution time,
@@ -212,65 +212,81 @@ typedef struct Counters
* launched */
int64 generic_plan_calls; /* number of calls using a generic plan */
int64 custom_plan_calls; /* number of calls using a custom plan */
-} Counters;
+} pgssCounters;
-/*
- * Global statistics for pg_stat_statements
- */
-typedef struct pgssGlobalStats
+/* Shared pgstat entry */
+typedef struct PgStatShared_Pgss
{
- int64 dealloc; /* # of times entries were deallocated */
- TimestampTz stats_reset; /* timestamp with all stats reset */
-} pgssGlobalStats;
+ PgStatShared_Common header;
+ pgssHashKey key;
+ pgssCounters counters;
+} PgStatShared_Pgss;
/*
- * Statistics per statement
+ * Entry in the dshash source-of-truth registry.
*
* Note: in event of a failure in garbage collection of the query text file,
* we reset query_offset to zero and query_len to -1. This will be seen as
* an invalid state by qtext_fetch().
+ *
+ * Note: Key must be first for dshash.
*/
typedef struct pgssEntry
{
- pgssHashKey key; /* hash key of entry - MUST BE FIRST */
- Counters counters; /* the statistics for this query */
+ pgssHashKey key;
+ pg_atomic_uint32 refcount; /* clock-sweep: decremented on sweep, evict at
+ * 0 */
Size query_offset; /* query text offset in external file */
int query_len; /* # of valid bytes in query string, or -1 */
int encoding; /* query text encoding */
TimestampTz stats_since; /* timestamp of entry allocation */
TimestampTz minmax_stats_since; /* timestamp of last min/max values reset */
- slock_t mutex; /* protects the counters only */
} pgssEntry;
/*
- * Global shared state
+ * Global shared state stored in DSM segment
*/
typedef struct pgssSharedState
{
- LWLockPadded lock; /* protects hashtable search/modification */
- double cur_median_usage; /* current median usage in hashtable */
- Size mean_query_len; /* current mean entry text length */
+ pg_atomic_uint64 nentries; /* current number of tracked entries */
+ pg_atomic_uint64 dealloc; /* total # of entries evicted */
+ pg_atomic_uint32 sweep_partition; /* rotating hand: next partition to
+ * sweep */
+ TimestampTz stats_reset; /* timestamp with all stats reset */
slock_t mutex; /* protects following fields only: */
Size extent; /* current extent of query file */
int n_writers; /* number of active writers to query file */
int gc_count; /* query file garbage collection cycle count */
- pgssGlobalStats stats; /* global statistics for pgss */
} pgssSharedState;
-/* Links to shared memory state */
-static pgssSharedState *pgss;
-static HTAB *pgss_hash;
+/* Backend-local pending entry */
+typedef struct PgStat_PgssPending
+{
+ pgssHashKey key;
+ pgssCounters counters;
+} PgStat_PgssPending;
-static void pgss_shmem_request(void *arg);
-static void pgss_shmem_init(void *arg);
+/*
+ * source-of-truth registry stored in dshash
+ */
-static const ShmemCallbacks pgss_shmem_callbacks = {
- .request_fn = pgss_shmem_request,
- .init_fn = pgss_shmem_init,
+static const dshash_parameters pgss_dsh_params = {
+ sizeof(pgssHashKey),
+ sizeof(pgssEntry),
+ dshash_memcmp,
+ dshash_memhash,
+ dshash_memcpy,
+ LWTRANCHE_FIRST_USER_DEFINED,
};
/*---- Local variables ----*/
+/* Global shared state */
+static pgssSharedState *pgss_shared = NULL;
+
+/* source-of-truth dshash table */
+static dshash_table *pgss_hash = NULL;
+
/* Current nesting depth of planner/ExecutorRun/ProcessUtility calls */
static int nesting_level = 0;
@@ -314,9 +330,9 @@ static bool pgss_save = true; /* whether to save stats across shutdown */
#define record_gc_qtexts() \
do { \
- SpinLockAcquire(&pgss->mutex); \
- pgss->gc_count++; \
- SpinLockRelease(&pgss->mutex); \
+ SpinLockAcquire(&pgss_shared->mutex); \
+ pgss_shared->gc_count++; \
+ SpinLockRelease(&pgss_shared->mutex); \
} while(0)
/*---- Function declarations ----*/
@@ -335,7 +351,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_1_13);
PG_FUNCTION_INFO_V1(pg_stat_statements);
PG_FUNCTION_INFO_V1(pg_stat_statements_info);
-static void pgss_shmem_shutdown(int code, Datum arg);
static void pgss_post_parse_analyze(ParseState *pstate, Query *query,
const JumbleState *jstate);
static PlannedStmt *pgss_planner(Query *parse,
@@ -365,12 +380,11 @@ static void pgss_store(const char *query, int64 queryId,
int parallel_workers_to_launch,
int parallel_workers_launched,
PlannedStmtOrigin planOrigin);
-static void pg_stat_statements_internal(FunctionCallInfo fcinfo,
- pgssVersion api_version,
- bool showtext);
-static pgssEntry *entry_alloc(pgssHashKey *key, Size query_offset, int query_len,
- int encoding, bool sticky);
+
static void entry_dealloc(void);
+static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only);
+static uint64 pgss_hash_key(pgssHashKey *key);
+
static bool qtext_store(const char *query, int query_len,
Size *query_offset, int *gc_count);
static char *qtext_load_file(Size *buffer_size);
@@ -378,37 +392,99 @@ static char *qtext_fetch(Size query_offset, int query_len,
char *buffer, Size buffer_size);
static bool need_gc_qtexts(void);
static void gc_qtexts(void);
-static TimestampTz entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only);
static char *generate_normalized_query(const JumbleState *jstate,
const char *query,
int query_loc, int *query_len_p);
+static void pg_stat_statements_internal(FunctionCallInfo fcinfo,
+ pgssVersion api_version,
+ bool showtext);
+
+static bool pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait);
+static void pgss_to_serialized_data(const PgStat_HashKey *key,
+ const PgStatShared_Common *header,
+ FILE *statfile);
+static bool pgss_from_serialized_data(const PgStat_HashKey *key,
+ PgStatShared_Common *header,
+ FILE *statfile);
+static void pgss_finish(PgStat_StatsFileOp status);
+
+/*--------------------------------------------------------------------------
+ * Custom pgstat kind definition
+ *--------------------------------------------------------------------------
+ */
+
+static const PgStat_KindInfo pgss_kind_info = {
+ .name = "pg_stat_statements",
+ .fixed_amount = false,
+ .write_to_file = true,
+ .track_entry_count = true,
+ .accessed_across_databases = true,
+ .shared_size = sizeof(PgStatShared_Pgss),
+ .shared_data_off = offsetof(PgStatShared_Pgss, counters),
+ .shared_data_len = sizeof(pgssCounters),
+ .pending_size = sizeof(PgStat_PgssPending),
+ .flush_pending_cb = pgss_flush_pending_cb,
+ .to_serialized_data = pgss_to_serialized_data,
+ .from_serialized_data = pgss_from_serialized_data,
+ .finish = pgss_finish,
+};
+
+static uint64
+pgss_hash_key(pgssHashKey *key)
+{
+ return hash_bytes_extended((const unsigned char *) key,
+ sizeof(pgssHashKey), 0);
+}
+
+static void
+pgss_init_shmem(void *ptr, void *arg)
+{
+ pgssSharedState *state = (pgssSharedState *) ptr;
+
+ pg_atomic_init_u64(&state->nentries, 0);
+ pg_atomic_init_u64(&state->dealloc, 0);
+ pg_atomic_init_u32(&state->sweep_partition, 0);
+ state->stats_reset = GetCurrentTimestamp();
+ SpinLockInit(&state->mutex);
+ state->extent = 0;
+ state->n_writers = 0;
+ state->gc_count = 0;
+}
+
+static void
+pgss_attach_shmem(void)
+{
+ bool found;
+
+ if (pgss_shared != NULL)
+ return;
+
+ pgss_shared = GetNamedDSMSegment("pg_stat_statements_state",
+ sizeof(pgssSharedState),
+ pgss_init_shmem,
+ &found, NULL);
+
+ if (pgss_hash == NULL)
+ pgss_hash = GetNamedDSHash("pg_stat_statements", &pgss_dsh_params,
+ &found);
+}
+
/*
* Module load callback
*/
void
_PG_init(void)
{
- /*
- * In order to create our shared memory area, we have to be loaded via
- * shared_preload_libraries. If not, fall out without hooking into any of
- * the main system. (We don't throw error here because it seems useful to
- * allow the pg_stat_statements functions to be created even when the
- * module isn't active. The functions must protect themselves against
- * being called then, however.)
- */
if (!process_shared_preload_libraries_in_progress)
return;
- /*
- * Inform the postmaster that we want to enable query_id calculation if
- * compute_query_id is set to auto.
- */
EnableQueryId();
- /*
- * Define (or redefine) custom GUC variables.
- */
+ /* Register custom pgstat kind */
+ pgstat_register_kind(PGSTAT_KIND_PGSS, &pgss_kind_info);
+
+ /* Define GUCs */
DefineCustomIntVariable("pg_stat_statements.max",
"Sets the maximum number of statements tracked by pg_stat_statements.",
NULL,
@@ -416,7 +492,7 @@ _PG_init(void)
5000,
100,
INT_MAX / 2,
- PGC_POSTMASTER,
+ PGC_SIGHUP,
0,
NULL,
NULL,
@@ -457,7 +533,7 @@ _PG_init(void)
NULL);
DefineCustomBoolVariable("pg_stat_statements.save",
- "Save pg_stat_statements statistics across server shutdowns.",
+ "Save pg_stat_statements data across server shutdowns.",
NULL,
&pgss_save,
true,
@@ -469,11 +545,6 @@ _PG_init(void)
MarkGUCPrefixReserved("pg_stat_statements");
- /*
- * Register our shared memory needs.
- */
- RegisterShmemCallbacks(&pgss_shmem_callbacks);
-
/*
* Install hooks.
*/
@@ -493,369 +564,568 @@ _PG_init(void)
ProcessUtility_hook = pgss_ProcessUtility;
}
-/*
- * shmem request callback: Request shared memory resources.
- *
- * This is called at postmaster startup. Note that the shared memory isn't
- * allocated here yet, this merely register our needs.
- *
- * In EXEC_BACKEND mode, this is also called in each backend, to re-attach to
- * the shared memory area that was already initialized.
- */
-static void
-pgss_shmem_request(void *arg)
-{
- ShmemRequestHash(.name = "pg_stat_statements hash",
- .nelems = pgss_max,
- .hash_info.keysize = sizeof(pgssHashKey),
- .hash_info.entrysize = sizeof(pgssEntry),
- .hash_flags = HASH_ELEM | HASH_BLOBS,
- .ptr = &pgss_hash,
- );
- ShmemRequestStruct(.name = "pg_stat_statements",
- .size = sizeof(pgssSharedState),
- .ptr = (void **) &pgss,
- );
-}
-
-/*
- * shmem init callback: Initialize our shared memory data structures at
- * postmaster startup.
- *
- * Load any pre-existing statistics from file. Also create and load the
- * query-texts file, which is expected to exist (even if empty) while the
- * module is enabled.
+/*--------------------------------------------------------------------------
+ * pgstat flush helpers: merge per-kind timing counters into shared memory
+ *--------------------------------------------------------------------------
*/
static void
-pgss_shmem_init(void *arg)
+pgss_flush_kind(pgssCounters * shared, pgssCounters * pending, pgssStoreKind kind)
{
- int tranche_id;
- FILE *file = NULL;
- FILE *qfile = NULL;
- uint32 header;
- int32 num;
- int32 pgver;
- int32 i;
- int buffer_size;
- char *buffer = NULL;
-
- /*
- * We already checked that we're loaded from shared_preload_libraries in
- * _PG_init(), so we should not get here after postmaster startup.
- */
- Assert(!IsUnderPostmaster);
-
- /*
- * Initialize the shmem area with no statistics.
- */
- tranche_id = LWLockNewTrancheId("pg_stat_statements");
- LWLockInitialize(&pgss->lock.lock, tranche_id);
- pgss->cur_median_usage = ASSUMED_MEDIAN_INIT;
- pgss->mean_query_len = ASSUMED_LENGTH_INIT;
- SpinLockInit(&pgss->mutex);
- pgss->extent = 0;
- pgss->n_writers = 0;
- pgss->gc_count = 0;
- pgss->stats.dealloc = 0;
- pgss->stats.stats_reset = GetCurrentTimestamp();
-
- /* The hash table must've also been initialized by now */
- Assert(pgss_hash != NULL);
-
- /*
- * Set up a shmem exit hook to dump the statistics to disk on postmaster
- * (or standalone backend) exit.
- */
- on_shmem_exit(pgss_shmem_shutdown, (Datum) 0);
+ int64 n_a,
+ n_b;
+ double delta;
- /*
- * Load any pre-existing statistics from file.
- *
- * Note: we don't bother with locks here, because there should be no other
- * processes running when this code is reached.
- */
+ n_a = shared->calls[kind];
+ n_b = pending->calls[kind];
- /* Unlink query text file possibly left over from crash */
- unlink(PGSS_TEXT_FILE);
-
- /* Allocate new query text temp file */
- qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
- if (qfile == NULL)
- goto write_error;
+ shared->calls[kind] += n_b;
+ shared->total_time[kind] += pending->total_time[kind];
- /*
- * If we were told not to load old statistics, we're done. (Note we do
- * not try to unlink any old dump file in this case. This seems a bit
- * questionable but it's the historical behavior.)
- */
- if (!pgss_save)
+ if (n_a == 0)
{
- FreeFile(qfile);
- return;
+ shared->min_time[kind] = pending->min_time[kind];
+ shared->max_time[kind] = pending->max_time[kind];
+ shared->mean_time[kind] = pending->mean_time[kind];
+ shared->sum_var_time[kind] = pending->sum_var_time[kind];
}
-
- /*
- * Attempt to load old statistics from the dump file.
- */
- file = AllocateFile(PGSS_DUMP_FILE, PG_BINARY_R);
- if (file == NULL)
+ else
{
- if (errno != ENOENT)
- goto read_error;
- /* No existing persisted stats file, so we're done */
- FreeFile(qfile);
- return;
+ if (pending->min_time[kind] < shared->min_time[kind])
+ shared->min_time[kind] = pending->min_time[kind];
+ if (pending->max_time[kind] > shared->max_time[kind])
+ shared->max_time[kind] = pending->max_time[kind];
+
+ /*
+ * Chan's parallel variance algorithm: combine two sets of (count,
+ * mean, sum_of_squared_deviations). See
+ * <http://www.johndcook.com/blog/standard_deviation/>
+ */
+ delta = pending->mean_time[kind] - shared->mean_time[kind];
+ shared->sum_var_time[kind] +=
+ pending->sum_var_time[kind] +
+ delta * delta * (double) n_a * (double) n_b / (double) (n_a + n_b);
+ shared->mean_time[kind] =
+ shared->total_time[kind] / shared->calls[kind];
}
+}
- buffer_size = 2048;
- buffer = (char *) palloc(buffer_size);
+static bool
+pgss_flush_pending_cb(PgStat_EntryRef *entry_ref, bool nowait)
+{
+ PgStat_PgssPending *pending;
+ PgStatShared_Pgss *shared;
- if (fread(&header, sizeof(uint32), 1, file) != 1 ||
- fread(&pgver, sizeof(uint32), 1, file) != 1 ||
- fread(&num, sizeof(int32), 1, file) != 1)
- goto read_error;
+ pending = (PgStat_PgssPending *) entry_ref->pending;
+ shared = (PgStatShared_Pgss *) entry_ref->shared_stats;
- if (header != PGSS_FILE_HEADER ||
- pgver != PGSS_PG_MAJOR_VERSION)
- goto data_error;
+ if (!pgstat_lock_entry(entry_ref, nowait))
+ return false;
- for (i = 0; i < num; i++)
- {
- pgssEntry temp;
- pgssEntry *entry;
- Size query_offset;
+ shared->key = pending->key;
+
+ pgss_flush_kind(&shared->counters, &pending->counters, PGSS_EXEC);
+
+ if (pgss_track_planning && pending->counters.calls[PGSS_PLAN] > 0)
+ pgss_flush_kind(&shared->counters, &pending->counters, PGSS_PLAN);
+
+ shared->counters.rows += pending->counters.rows;
+ shared->counters.shared_blks_hit += pending->counters.shared_blks_hit;
+ shared->counters.shared_blks_read += pending->counters.shared_blks_read;
+ shared->counters.shared_blks_dirtied += pending->counters.shared_blks_dirtied;
+ shared->counters.shared_blks_written += pending->counters.shared_blks_written;
+ shared->counters.local_blks_hit += pending->counters.local_blks_hit;
+ shared->counters.local_blks_read += pending->counters.local_blks_read;
+ shared->counters.local_blks_dirtied += pending->counters.local_blks_dirtied;
+ shared->counters.local_blks_written += pending->counters.local_blks_written;
+ shared->counters.temp_blks_read += pending->counters.temp_blks_read;
+ shared->counters.temp_blks_written += pending->counters.temp_blks_written;
+ shared->counters.shared_blk_read_time += pending->counters.shared_blk_read_time;
+ shared->counters.shared_blk_write_time += pending->counters.shared_blk_write_time;
+ shared->counters.local_blk_read_time += pending->counters.local_blk_read_time;
+ shared->counters.local_blk_write_time += pending->counters.local_blk_write_time;
+ shared->counters.temp_blk_read_time += pending->counters.temp_blk_read_time;
+ shared->counters.temp_blk_write_time += pending->counters.temp_blk_write_time;
+ shared->counters.wal_records += pending->counters.wal_records;
+ shared->counters.wal_fpi += pending->counters.wal_fpi;
+ shared->counters.wal_bytes += pending->counters.wal_bytes;
+ shared->counters.wal_buffers_full += pending->counters.wal_buffers_full;
+ shared->counters.jit_functions += pending->counters.jit_functions;
+ shared->counters.jit_generation_time += pending->counters.jit_generation_time;
+ shared->counters.jit_inlining_count += pending->counters.jit_inlining_count;
+ shared->counters.jit_inlining_time += pending->counters.jit_inlining_time;
+ shared->counters.jit_optimization_count += pending->counters.jit_optimization_count;
+ shared->counters.jit_optimization_time += pending->counters.jit_optimization_time;
+ shared->counters.jit_emission_count += pending->counters.jit_emission_count;
+ shared->counters.jit_emission_time += pending->counters.jit_emission_time;
+ shared->counters.jit_deform_count += pending->counters.jit_deform_count;
+ shared->counters.jit_deform_time += pending->counters.jit_deform_time;
+ shared->counters.parallel_workers_to_launch += pending->counters.parallel_workers_to_launch;
+ shared->counters.parallel_workers_launched += pending->counters.parallel_workers_launched;
+ shared->counters.generic_plan_calls += pending->counters.generic_plan_calls;
+ shared->counters.custom_plan_calls += pending->counters.custom_plan_calls;
+
+ pgstat_unlock_entry(entry_ref);
- if (fread(&temp, sizeof(pgssEntry), 1, file) != 1)
- goto read_error;
+ return true;
+}
- /* Encoding is the only field we can easily sanity-check */
- if (!PG_VALID_BE_ENCODING(temp.encoding))
- goto data_error;
+/*
+ * Serialize dshash entry data + query text inline alongside each pgstat entry.
+ * On restart, from_serialized_data reconstructs both.
+ */
+static void
+pgss_to_serialized_data(const PgStat_HashKey *key,
+ const PgStatShared_Common *header,
+ FILE *statfile)
+{
+ static char *qbuffer = NULL;
+ static Size qbuffer_size = 0;
- /* Resize buffer as needed */
- if (temp.query_len >= buffer_size)
- {
- buffer_size = Max(buffer_size * 2, temp.query_len + 1);
- buffer = repalloc(buffer, buffer_size);
- }
+ PgStatShared_Pgss *shpgss = (PgStatShared_Pgss *) header;
+ pgssEntry *entry;
+ bool found = false;
+ pgssEntry serialized;
+ char *qtext = NULL;
+ int qtext_len = 0;
- if (fread(buffer, 1, temp.query_len + 1, file) != temp.query_len + 1)
- goto read_error;
+ if (!pgss_save)
+ {
+ pgstat_write_chunk_s(statfile, &found);
+ return;
+ }
- /* Should have a trailing null, but let's make sure */
- buffer[temp.query_len] = '\0';
+ pgss_attach_shmem();
- /* Skip loading "sticky" entries */
- if (IS_STICKY(temp.counters))
- continue;
+ memset(&serialized, 0, sizeof(pgssEntry));
- /* Store the query text */
- query_offset = pgss->extent;
- if (fwrite(buffer, 1, temp.query_len + 1, qfile) != temp.query_len + 1)
- goto write_error;
- pgss->extent += temp.query_len + 1;
-
- /* make the hashtable entry (discards old entries if too many) */
- entry = entry_alloc(&temp.key, query_offset, temp.query_len,
- temp.encoding,
- false);
-
- /* copy in the actual stats */
- entry->counters = temp.counters;
- entry->stats_since = temp.stats_since;
- entry->minmax_stats_since = temp.minmax_stats_since;
+ entry = dshash_find(pgss_hash, &shpgss->key, false);
+ if (entry)
+ {
+ serialized = *entry;
+ found = true;
+ dshash_release_lock(pgss_hash, entry);
}
- /* Read global statistics for pg_stat_statements */
- if (fread(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
- goto read_error;
+ pgstat_write_chunk_s(statfile, &found);
- pfree(buffer);
- FreeFile(file);
- FreeFile(qfile);
+ if (!found)
+ return;
- /*
- * Remove the persisted stats file so it's not included in
- * backups/replication standbys, etc. A new file will be written on next
- * shutdown.
- *
- * Note: it's okay if the PGSS_TEXT_FILE is included in a basebackup,
- * because we remove that file on startup; it acts inversely to
- * PGSS_DUMP_FILE, in that it is only supposed to be around when the
- * server is running, whereas PGSS_DUMP_FILE is only supposed to be around
- * when the server is not running. Leaving the file creates no danger of
- * a newly restored database having a spurious record of execution costs,
- * which is what we're really concerned about here.
- */
- unlink(PGSS_DUMP_FILE);
+ pgstat_write_chunk_s(statfile, &serialized);
- return;
+ /* Load query text file once, reuse across all entries */
+ if (!qbuffer)
+ qbuffer = qtext_load_file(&qbuffer_size);
-read_error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m",
- PGSS_DUMP_FILE)));
- goto fail;
-data_error:
- ereport(LOG,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("ignoring invalid data in file \"%s\"",
- PGSS_DUMP_FILE)));
- goto fail;
-write_error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_TEXT_FILE)));
-fail:
- if (buffer)
- pfree(buffer);
- if (file)
- FreeFile(file);
- if (qfile)
- FreeFile(qfile);
- /* If possible, throw away the bogus file; ignore any error */
- unlink(PGSS_DUMP_FILE);
+ if (serialized.query_len >= 0 && qbuffer)
+ qtext = qtext_fetch(serialized.query_offset, serialized.query_len,
+ qbuffer, qbuffer_size);
- /*
- * Don't unlink PGSS_TEXT_FILE here; it should always be around while the
- * server is running with pg_stat_statements enabled
- */
+ if (qtext)
+ {
+ qtext_len = serialized.query_len;
+ pgstat_write_chunk_s(statfile, &qtext_len);
+ pgstat_write_chunk(statfile, qtext, qtext_len + 1);
+ }
+ else
+ {
+ qtext_len = -1;
+ pgstat_write_chunk_s(statfile, &qtext_len);
+ }
}
/*
- * shmem_shutdown hook: Dump statistics into file.
- *
- * Note: we don't bother with acquiring lock, because there should be no
- * other processes running when this is called.
+ * Deserialize auxiliary data: recreate the dshash entry and store query
+ * text in the file.
*/
-static void
-pgss_shmem_shutdown(int code, Datum arg)
+static bool
+pgss_from_serialized_data(const PgStat_HashKey *key,
+ PgStatShared_Common *header,
+ FILE *statfile)
{
- FILE *file;
- char *qbuffer = NULL;
- Size qbuffer_size = 0;
- HASH_SEQ_STATUS hash_seq;
- int32 num_entries;
- pgssEntry *entry;
-
- /* Don't try to dump during a crash. */
- if (code)
- return;
+ bool had_entry;
+ pgssEntry serialized;
+ pgssEntry *dsh_entry;
+ bool found;
+ int qtext_len;
- /* Safety check ... shouldn't get here unless shmem is set up. */
- if (!pgss || !pgss_hash)
- return;
+ if (!pgstat_read_chunk_s(statfile, &had_entry))
+ return false;
- /* Don't dump if told not to. */
- if (!pgss_save)
- return;
+ if (!had_entry)
+ {
+ pgstat_drop_entry(PGSTAT_KIND_PGSS, key->dboid, key->objid, false);
+ return true;
+ }
- file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W);
- if (file == NULL)
- goto error;
+ if (!pgstat_read_chunk_s(statfile, &serialized))
+ return false;
- if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1)
- goto error;
- if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1)
- goto error;
- num_entries = hash_get_num_entries(pgss_hash);
- if (fwrite(&num_entries, sizeof(int32), 1, file) != 1)
- goto error;
+ if (!pgstat_read_chunk_s(statfile, &qtext_len))
+ return false;
- qbuffer = qtext_load_file(&qbuffer_size);
- if (qbuffer == NULL)
- goto error;
+ pgss_attach_shmem();
- /*
- * When serializing to disk, we store query texts immediately after their
- * entry data. Any orphaned query texts are thereby excluded.
- */
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ dsh_entry = dshash_find_or_insert(pgss_hash, &serialized.key, &found);
+ if (!found)
{
- int len = entry->query_len;
- char *qstr = qtext_fetch(entry->query_offset, len,
- qbuffer, qbuffer_size);
+ pg_atomic_init_u32(&dsh_entry->refcount, pg_atomic_read_u32(&serialized.refcount));
+ dsh_entry->encoding = serialized.encoding;
+ dsh_entry->stats_since = serialized.stats_since;
+ dsh_entry->minmax_stats_since = serialized.minmax_stats_since;
+
+ if (qtext_len >= 0)
+ {
+ char *qtext;
+ Size query_offset;
+
+ qtext = palloc(qtext_len + 1);
+ if (!pgstat_read_chunk(statfile, qtext, qtext_len + 1))
+ {
+ dshash_release_lock(pgss_hash, dsh_entry);
+ pfree(qtext);
+ return false;
+ }
- if (qstr == NULL)
- continue; /* Ignore any entries with bogus texts */
+ /* Store in the text file */
+ if (qtext_store(qtext, qtext_len, &query_offset, NULL))
+ {
+ dsh_entry->query_offset = query_offset;
+ dsh_entry->query_len = qtext_len;
+ }
+ else
+ {
+ dsh_entry->query_offset = 0;
+ dsh_entry->query_len = -1;
+ }
- if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 ||
- fwrite(qstr, 1, len + 1, file) != len + 1)
+ pfree(qtext);
+ }
+ else
{
- /* note: we assume hash_seq_term won't change errno */
- hash_seq_term(&hash_seq);
- goto error;
+ dsh_entry->query_offset = 0;
+ dsh_entry->query_len = -1;
}
+
+ pg_atomic_fetch_add_u64(&pgss_shared->nentries, 1);
}
+ else
+ {
+ /* Entry already exists (race), skip the text */
+ if (qtext_len >= 0)
+ {
+ char *discard = palloc(qtext_len + 1);
- /* Dump global statistics for pg_stat_statements */
- if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1)
- goto error;
+ pgstat_read_chunk(statfile, discard, qtext_len + 1);
+ pfree(discard);
+ }
+ }
- pfree(qbuffer);
- qbuffer = NULL;
+ dshash_release_lock(pgss_hash, dsh_entry);
- if (FreeFile(file))
- {
- file = NULL;
- goto error;
- }
+ return true;
+}
- /*
- * Rename file into place, so we atomically replace any old one.
- */
- (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG);
+static void
+pgss_finish(PgStat_StatsFileOp status)
+{
+ switch (status)
+ {
+ case STATS_WRITE:
- /* Unlink query-texts file; it's not needed while shutdown */
- unlink(PGSS_TEXT_FILE);
+ /*
+ * Text has been serialized into the pgstat file; remove the
+ * original.
+ */
+ unlink(PGSS_TEXT_FILE);
+ break;
- return;
+ case STATS_READ:
+ /* Text file was rebuilt by from_serialized_data; keep it. */
+ break;
-error:
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- PGSS_DUMP_FILE ".tmp")));
- if (qbuffer)
- pfree(qbuffer);
- if (file)
- FreeFile(file);
- unlink(PGSS_DUMP_FILE ".tmp");
- unlink(PGSS_TEXT_FILE);
+ case STATS_DISCARD:
+ /* Stats discarded; remove orphaned text file. */
+ unlink(PGSS_TEXT_FILE);
+ break;
+ }
}
-/*
- * Post-parse-analysis hook: mark query with a queryId
+/*--------------------------------------------------------------------------
+ * pgss_store: Record statistics for one statement execution.
+ *
+ * Creates dshash entry if new, then accumulates counters in the pending
+ * pgstat entry.
+ *--------------------------------------------------------------------------
*/
static void
-pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jstate)
+pgss_store(const char *query, int64 queryId,
+ int query_location, int query_len,
+ pgssStoreKind kind,
+ double total_time, uint64 rows,
+ const BufferUsage *bufusage,
+ const WalUsage *walusage,
+ const struct JitInstrumentation *jitusage,
+ const JumbleState *jstate,
+ int parallel_workers_to_launch,
+ int parallel_workers_launched,
+ PlannedStmtOrigin planOrigin)
{
- if (prev_post_parse_analyze_hook)
- prev_post_parse_analyze_hook(pstate, query, jstate);
+ pgssHashKey key;
+ pgssEntry *dsh_entry;
+ bool found;
+ uint64 objid;
+ PgStat_EntryRef *entry_ref;
+ PgStat_PgssPending *pending;
+ char *norm_query = NULL;
+ int encoding = GetDatabaseEncoding();
+
+ Assert(query != NULL);
- /* Safety check... */
- if (!pgss || !pgss_hash || !pgss_enabled(nesting_level))
+ if (queryId == INT64CONST(0))
return;
/*
- * If it's EXECUTE, clear the queryId so that stats will accumulate for
- * the underlying PREPARE. But don't do this if we're not tracking
- * utility statements, to avoid messing up another extension that might be
- * tracking them.
+ * Confine our attention to the relevant part of the string, if the query
+ * is a portion of a multi-statement source string, and update query
+ * location and length if needed.
*/
- if (query->utilityStmt)
+ query = CleanQuerytext(query, &query_location, &query_len);
+
+ /* Build key */
+ memset(&key, 0, sizeof(pgssHashKey));
+ key.userid = GetUserId();
+ key.dbid = MyDatabaseId;
+ key.queryid = queryId;
+ key.toplevel = (nesting_level == 0);
+
+ /* Create or update dshash registry entry */
+ pgss_attach_shmem();
+
+ /* Fast path: look up with shared lock only */
+ dsh_entry = dshash_find(pgss_hash, &key, false);
+ if (dsh_entry)
{
- if (pgss_track_utility && IsA(query->utilityStmt, ExecuteStmt))
- {
- query->queryId = INT64CONST(0);
+ /* Bump refcount atomically under shared lock */
+ uint32 cur = pg_atomic_read_u32(&dsh_entry->refcount);
+
+ if (cur < PGSS_REF_CAP)
+ pg_atomic_compare_exchange_u32(&dsh_entry->refcount, &cur, cur + 1);
+
+ dshash_release_lock(pgss_hash, dsh_entry);
+
+ /* If jstate is provided and kind is PGSS_INVALID, nothing more to do */
+ if (jstate && kind == PGSS_INVALID)
return;
- }
}
-
- /*
+ else
+ {
+ Size query_offset;
+ int gc_count;
+ bool stored;
+ bool do_gc;
+
+ /* Slow path: insert with exclusive lock */
+ if (jstate)
+ {
+ norm_query = generate_normalized_query(jstate, query,
+ query_location,
+ &query_len);
+ }
+
+ /* Store query text in the file */
+ stored = qtext_store(norm_query ? norm_query : query, query_len,
+ &query_offset, &gc_count);
+
+ /*
+ * Determine whether we need to garbage collect external query texts.
+ */
+ do_gc = need_gc_qtexts();
+
+ dsh_entry = dshash_find_or_insert(pgss_hash, &key, &found);
+ if (!found)
+ {
+ pg_atomic_init_u32(&dsh_entry->refcount, 1);
+ dsh_entry->stats_since = GetCurrentTimestamp();
+ dsh_entry->minmax_stats_since = dsh_entry->stats_since;
+ dsh_entry->encoding = encoding;
+
+ if (stored)
+ {
+ dsh_entry->query_offset = query_offset;
+ dsh_entry->query_len = query_len;
+ }
+ else
+ {
+ dsh_entry->query_offset = 0;
+ dsh_entry->query_len = -1;
+ }
+
+ dshash_release_lock(pgss_hash, dsh_entry);
+
+ pg_atomic_fetch_add_u64(&pgss_shared->nentries, 1);
+ entry_dealloc();
+
+ /* If needed, perform garbage collection */
+ if (do_gc)
+ gc_qtexts();
+
+ if (jstate && kind == PGSS_INVALID)
+ {
+ if (norm_query)
+ pfree(norm_query);
+ return;
+ }
+ }
+ else
+ {
+ /* Another backend inserted it concurrently */
+ uint32 cur = pg_atomic_read_u32(&dsh_entry->refcount);
+
+ if (cur < PGSS_REF_CAP)
+ pg_atomic_compare_exchange_u32(&dsh_entry->refcount, &cur, cur + 1);
+ dshash_release_lock(pgss_hash, dsh_entry);
+
+ if (jstate && kind == PGSS_INVALID)
+ {
+ if (norm_query)
+ pfree(norm_query);
+ return;
+ }
+ }
+ }
+
+ if (norm_query)
+ pfree(norm_query);
+
+ /* Accumulate counters in pending pgstat entry */
+ Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
+
+ objid = pgss_hash_key(&key);
+
+ entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_PGSS, key.dbid,
+ objid, NULL);
+ pending = (PgStat_PgssPending *) entry_ref->pending;
+ pending->key = key;
+
+ pending->counters.calls[kind]++;
+ pending->counters.total_time[kind] += total_time;
+
+ if (pending->counters.calls[kind] == 1)
+ {
+ pending->counters.min_time[kind] = total_time;
+ pending->counters.max_time[kind] = total_time;
+ pending->counters.mean_time[kind] = total_time;
+ }
+ else
+ {
+ /*
+ * Welford's online algorithm for accumulating mean and sum of squared
+ * deviations. See <http://www.johndcook.com/blog/standard_deviation/>
+ */
+ double old_mean = pending->counters.mean_time[kind];
+
+ pending->counters.mean_time[kind] +=
+ (total_time - old_mean) / pending->counters.calls[kind];
+ pending->counters.sum_var_time[kind] +=
+ (total_time - old_mean) * (total_time - pending->counters.mean_time[kind]);
+
+ if (pending->counters.min_time[kind] > total_time)
+ pending->counters.min_time[kind] = total_time;
+ if (pending->counters.max_time[kind] < total_time)
+ pending->counters.max_time[kind] = total_time;
+ }
+
+ pending->counters.rows += rows;
+
+ if (bufusage)
+ {
+ pending->counters.shared_blks_hit += bufusage->shared_blks_hit;
+ pending->counters.shared_blks_read += bufusage->shared_blks_read;
+ pending->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
+ pending->counters.shared_blks_written += bufusage->shared_blks_written;
+ pending->counters.local_blks_hit += bufusage->local_blks_hit;
+ pending->counters.local_blks_read += bufusage->local_blks_read;
+ pending->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
+ pending->counters.local_blks_written += bufusage->local_blks_written;
+ pending->counters.temp_blks_read += bufusage->temp_blks_read;
+ pending->counters.temp_blks_written += bufusage->temp_blks_written;
+ pending->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
+ pending->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
+ pending->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
+ pending->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
+ pending->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
+ pending->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
+ }
+
+ if (walusage)
+ {
+ pending->counters.wal_records += walusage->wal_records;
+ pending->counters.wal_fpi += walusage->wal_fpi;
+ pending->counters.wal_bytes += walusage->wal_bytes;
+ pending->counters.wal_buffers_full += walusage->wal_buffers_full;
+ }
+
+ if (jitusage)
+ {
+ pending->counters.jit_functions += jitusage->created_functions;
+ pending->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
+
+ if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
+ pending->counters.jit_deform_count++;
+ pending->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
+
+ if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
+ pending->counters.jit_inlining_count++;
+ pending->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
+
+ if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
+ pending->counters.jit_optimization_count++;
+ pending->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
+
+ if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
+ pending->counters.jit_emission_count++;
+ pending->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
+ }
+
+ pending->counters.parallel_workers_to_launch += parallel_workers_to_launch;
+ pending->counters.parallel_workers_launched += parallel_workers_launched;
+
+ if (planOrigin == PLAN_STMT_CACHE_GENERIC)
+ pending->counters.generic_plan_calls++;
+ else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
+ pending->counters.custom_plan_calls++;
+}
+
+/*--------------------------------------------------------------------------
+ * Hook implementations
+ *--------------------------------------------------------------------------
+ */
+
+static void
+pgss_post_parse_analyze(ParseState *pstate, Query *query,
+ const JumbleState *jstate)
+{
+ if (prev_post_parse_analyze_hook)
+ prev_post_parse_analyze_hook(pstate, query, jstate);
+
+ if (!pgss_enabled(nesting_level))
+ return;
+
+ /*
+ * Clear queryId for EXECUTE so stats accumulate under the PREPARE's
+ * queryId instead.
+ */
+ if (query->utilityStmt)
+ {
+ if (pgss_track_utility && IsA(query->utilityStmt, ExecuteStmt))
+ {
+ query->queryId = INT64CONST(0);
+ return;
+ }
+ }
+
+ /*
* If query jumbling were able to identify any ignorable constants, we
* immediately create a hash table entry for the query, so that we can
* record the normalized form of the query string. If there were no such
@@ -879,10 +1149,6 @@ pgss_post_parse_analyze(ParseState *pstate, Query *query, const JumbleState *jst
PLAN_STMT_UNKNOWN);
}
-/*
- * Planner hook: forward to regular planner, but measure planning time
- * if needed.
- */
static PlannedStmt *
pgss_planner(Query *parse,
const char *query_string,
@@ -908,13 +1174,7 @@ pgss_planner(Query *parse,
WalUsage walusage_start,
walusage;
- /* We need to track buffer usage as the planner can access them. */
bufusage_start = pgBufferUsage;
-
- /*
- * Similarly the planner could write some WAL records in some cases
- * (e.g. setting a hint bit with those being WAL-logged)
- */
walusage_start = pgWalUsage;
INSTR_TIME_SET_CURRENT(start);
@@ -937,11 +1197,9 @@ pgss_planner(Query *parse,
INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);
- /* calc differences of buffer counters. */
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
- /* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
@@ -956,17 +1214,11 @@ pgss_planner(Query *parse,
&walusage,
NULL,
NULL,
- 0,
- 0,
+ 0, 0,
result->planOrigin);
}
else
{
- /*
- * Even though we're not tracking plan time for this statement, we
- * must still increment the nesting level, to ensure that functions
- * evaluated during planning are not seen as top-level calls.
- */
nesting_level++;
PG_TRY();
{
@@ -987,20 +1239,12 @@ pgss_planner(Query *parse,
return result;
}
-/*
- * ExecutorStart hook: start up tracking if needed
- */
static void
pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
- /*
- * If query has queryId zero, don't track it. This prevents double
- * counting of optimizable statements that are directly contained in
- * utility statements.
- */
- if (pgss_enabled(nesting_level) && queryDesc->plannedstmt->queryId != INT64CONST(0))
+ if (pgss_enabled(nesting_level) &&
+ queryDesc->plannedstmt->queryId != INT64CONST(0))
{
- /* Request all summary instrumentation, i.e. timing, buffers and WAL */
queryDesc->query_instr_options |= INSTRUMENT_ALL;
}
@@ -1010,9 +1254,6 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
standard_ExecutorStart(queryDesc, eflags);
}
-/*
- * ExecutorRun hook: all we need do is track nesting depth
- */
static void
pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
{
@@ -1031,9 +1272,6 @@ pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
PG_END_TRY();
}
-/*
- * ExecutorFinish hook: all we need do is track nesting depth
- */
static void
pgss_ExecutorFinish(QueryDesc *queryDesc)
{
@@ -1052,9 +1290,6 @@ pgss_ExecutorFinish(QueryDesc *queryDesc)
PG_END_TRY();
}
-/*
- * ExecutorEnd hook: store results if needed
- */
static void
pgss_ExecutorEnd(QueryDesc *queryDesc)
{
@@ -1085,9 +1320,6 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
standard_ExecutorEnd(queryDesc);
}
-/*
- * ProcessUtility hook
- */
static void
pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
bool readOnlyTree,
@@ -1102,36 +1334,9 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
PlannedStmtOrigin saved_planOrigin = pstmt->planOrigin;
bool enabled = pgss_track_utility && pgss_enabled(nesting_level);
- /*
- * Force utility statements to get queryId zero. We do this even in cases
- * where the statement contains an optimizable statement for which a
- * queryId could be derived (such as EXPLAIN or DECLARE CURSOR). For such
- * cases, runtime control will first go through ProcessUtility and then
- * the executor, and we don't want the executor hooks to do anything,
- * since we are already measuring the statement's costs at the utility
- * level.
- *
- * Note that this is only done if pg_stat_statements is enabled and
- * configured to track utility statements, in the unlikely possibility
- * that user configured another extension to handle utility statements
- * only.
- */
if (enabled)
pstmt->queryId = INT64CONST(0);
- /*
- * If it's an EXECUTE statement, we don't track it and don't increment the
- * nesting level. This allows the cycles to be charged to the underlying
- * PREPARE instead (by the Executor hooks), which is much more useful.
- *
- * We also don't track execution of PREPARE. If we did, we would get one
- * hash table entry for the PREPARE (with hash calculated from the query
- * string), and then a different one with the same query string (but hash
- * calculated from the query tree) would be used to accumulate costs of
- * ensuing EXECUTEs. This would be confusing. Since PREPARE doesn't
- * actually run the planner (only parse+rewrite), its costs are generally
- * pretty negligible and it seems okay to just ignore it.
- */
if (enabled &&
!IsA(parsetree, ExecuteStmt) &&
!IsA(parsetree, PrepareStmt))
@@ -1166,36 +1371,20 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
}
PG_END_TRY();
- /*
- * CAUTION: do not access the *pstmt data structure again below here.
- * If it was a ROLLBACK or similar, that data structure may have been
- * freed. We must copy everything we still need into local variables,
- * which we did above.
- *
- * For the same reason, we can't risk restoring pstmt->queryId to its
- * former value, which'd otherwise be a good idea.
- */
pstmt = NULL;
INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);
- /*
- * Track the total number of rows retrieved or affected by the utility
- * statements of COPY, FETCH, CREATE TABLE AS, CREATE MATERIALIZED
- * VIEW, REFRESH MATERIALIZED VIEW and SELECT INTO.
- */
rows = (qc && (qc->commandTag == CMDTAG_COPY ||
qc->commandTag == CMDTAG_FETCH ||
qc->commandTag == CMDTAG_SELECT ||
qc->commandTag == CMDTAG_REFRESH_MATERIALIZED_VIEW)) ?
qc->nprocessed : 0;
- /* calc differences of buffer counters. */
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);
- /* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
@@ -1210,24 +1399,11 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
&walusage,
NULL,
NULL,
- 0,
- 0,
+ 0, 0,
saved_planOrigin);
}
else
{
- /*
- * Even though we're not tracking execution time for this statement,
- * we must still increment the nesting level, to ensure that functions
- * evaluated within it are not seen as top-level calls. But don't do
- * so for EXECUTE; that way, when control reaches pgss_planner or
- * pgss_ExecutorStart, we will treat the costs as top-level if
- * appropriate. Likewise, don't bump for PREPARE, so that parse
- * analysis will treat the statement as top-level if appropriate.
- *
- * To be absolutely certain we don't mess up the nesting level,
- * evaluate the bump_level condition just once.
- */
bool bump_level =
!IsA(parsetree, ExecuteStmt) &&
!IsA(parsetree, PrepareStmt);
@@ -1254,251 +1430,33 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
}
}
-/*
- * Store some statistics for a statement.
- *
- * If jstate is not NULL then we're trying to create an entry for which
- * we have no statistics as yet; we just want to record the normalized
- * query string. total_time, rows, bufusage and walusage are ignored in this
- * case.
- *
- * If kind is PGSS_PLAN or PGSS_EXEC, its value is used as the array position
- * for the arrays in the Counters field.
+/*--------------------------------------------------------------------------
+ * SQL-callable functions
+ *--------------------------------------------------------------------------
*/
-static void
-pgss_store(const char *query, int64 queryId,
- int query_location, int query_len,
- pgssStoreKind kind,
- double total_time, uint64 rows,
- const BufferUsage *bufusage,
- const WalUsage *walusage,
- const struct JitInstrumentation *jitusage,
- const JumbleState *jstate,
- int parallel_workers_to_launch,
- int parallel_workers_launched,
- PlannedStmtOrigin planOrigin)
-{
- pgssHashKey key;
- pgssEntry *entry;
- char *norm_query = NULL;
- int encoding = GetDatabaseEncoding();
-
- Assert(query != NULL);
-
- /* Safety check... */
- if (!pgss || !pgss_hash)
- return;
-
- /*
- * Nothing to do if compute_query_id isn't enabled and no other module
- * computed a query identifier.
- */
- if (queryId == INT64CONST(0))
- return;
-
- /*
- * Confine our attention to the relevant part of the string, if the query
- * is a portion of a multi-statement source string, and update query
- * location and length if needed.
- */
- query = CleanQuerytext(query, &query_location, &query_len);
-
- /* Set up key for hashtable search */
-
- /* clear padding */
- memset(&key, 0, sizeof(pgssHashKey));
-
- key.userid = GetUserId();
- key.dbid = MyDatabaseId;
- key.queryid = queryId;
- key.toplevel = (nesting_level == 0);
-
- /* Lookup the hash table entry with shared lock. */
- LWLockAcquire(&pgss->lock.lock, LW_SHARED);
-
- entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
- /* Create new entry, if not present */
- if (!entry)
- {
- Size query_offset;
- int gc_count;
- bool stored;
- bool do_gc;
-
- /*
- * Create a new, normalized query string if caller asked. We don't
- * need to hold the lock while doing this work. (Note: in any case,
- * it's possible that someone else creates a duplicate hashtable entry
- * in the interval where we don't hold the lock below. That case is
- * handled by entry_alloc.)
- */
- if (jstate)
- {
- LWLockRelease(&pgss->lock.lock);
- norm_query = generate_normalized_query(jstate, query,
- query_location,
- &query_len);
- LWLockAcquire(&pgss->lock.lock, LW_SHARED);
- }
- /* Append new query text to file with only shared lock held */
- stored = qtext_store(norm_query ? norm_query : query, query_len,
- &query_offset, &gc_count);
-
- /*
- * Determine whether we need to garbage collect external query texts
- * while the shared lock is still held. This micro-optimization
- * avoids taking the time to decide this while holding exclusive lock.
- */
- do_gc = need_gc_qtexts();
-
- /* Need exclusive lock to make a new hashtable entry - promote */
- LWLockRelease(&pgss->lock.lock);
- LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
-
- /*
- * A garbage collection may have occurred while we weren't holding the
- * lock. In the unlikely event that this happens, the query text we
- * stored above will have been garbage collected, so write it again.
- * This should be infrequent enough that doing it while holding
- * exclusive lock isn't a performance problem.
- */
- if (!stored || pgss->gc_count != gc_count)
- stored = qtext_store(norm_query ? norm_query : query, query_len,
- &query_offset, NULL);
-
- /* If we failed to write to the text file, give up */
- if (!stored)
- goto done;
-
- /* OK to create a new hashtable entry */
- entry = entry_alloc(&key, query_offset, query_len, encoding,
- jstate != NULL);
-
- /* If needed, perform garbage collection while exclusive lock held */
- if (do_gc)
- gc_qtexts();
- }
-
- /* Increment the counts, except when jstate is not NULL */
- if (!jstate)
- {
- Assert(kind == PGSS_PLAN || kind == PGSS_EXEC);
-
- /*
- * Grab the spinlock while updating the counters (see comment about
- * locking rules at the head of the file)
- */
- SpinLockAcquire(&entry->mutex);
-
- /* "Unstick" entry if it was previously sticky */
- if (IS_STICKY(entry->counters))
- entry->counters.usage = USAGE_INIT;
-
- entry->counters.calls[kind] += 1;
- entry->counters.total_time[kind] += total_time;
-
- if (entry->counters.calls[kind] == 1)
- {
- entry->counters.min_time[kind] = total_time;
- entry->counters.max_time[kind] = total_time;
- entry->counters.mean_time[kind] = total_time;
- }
- else
- {
- /*
- * Welford's method for accurately computing variance. See
- * <http://www.johndcook.com/blog/standard_deviation/>
- */
- double old_mean = entry->counters.mean_time[kind];
-
- entry->counters.mean_time[kind] +=
- (total_time - old_mean) / entry->counters.calls[kind];
- entry->counters.sum_var_time[kind] +=
- (total_time - old_mean) * (total_time - entry->counters.mean_time[kind]);
-
- /*
- * Calculate min and max time. min = 0 and max = 0 means that the
- * min/max statistics were reset
- */
- if (entry->counters.min_time[kind] == 0
- && entry->counters.max_time[kind] == 0)
- {
- entry->counters.min_time[kind] = total_time;
- entry->counters.max_time[kind] = total_time;
- }
- else
- {
- if (entry->counters.min_time[kind] > total_time)
- entry->counters.min_time[kind] = total_time;
- if (entry->counters.max_time[kind] < total_time)
- entry->counters.max_time[kind] = total_time;
- }
- }
- entry->counters.rows += rows;
- entry->counters.shared_blks_hit += bufusage->shared_blks_hit;
- entry->counters.shared_blks_read += bufusage->shared_blks_read;
- entry->counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
- entry->counters.shared_blks_written += bufusage->shared_blks_written;
- entry->counters.local_blks_hit += bufusage->local_blks_hit;
- entry->counters.local_blks_read += bufusage->local_blks_read;
- entry->counters.local_blks_dirtied += bufusage->local_blks_dirtied;
- entry->counters.local_blks_written += bufusage->local_blks_written;
- entry->counters.temp_blks_read += bufusage->temp_blks_read;
- entry->counters.temp_blks_written += bufusage->temp_blks_written;
- entry->counters.shared_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_read_time);
- entry->counters.shared_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->shared_blk_write_time);
- entry->counters.local_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_read_time);
- entry->counters.local_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->local_blk_write_time);
- entry->counters.temp_blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_read_time);
- entry->counters.temp_blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->temp_blk_write_time);
- entry->counters.usage += USAGE_EXEC(total_time);
- entry->counters.wal_records += walusage->wal_records;
- entry->counters.wal_fpi += walusage->wal_fpi;
- entry->counters.wal_bytes += walusage->wal_bytes;
- entry->counters.wal_buffers_full += walusage->wal_buffers_full;
- if (jitusage)
- {
- entry->counters.jit_functions += jitusage->created_functions;
- entry->counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);
-
- if (INSTR_TIME_GET_MILLISEC(jitusage->deform_counter))
- entry->counters.jit_deform_count++;
- entry->counters.jit_deform_time += INSTR_TIME_GET_MILLISEC(jitusage->deform_counter);
-
- if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
- entry->counters.jit_inlining_count++;
- entry->counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);
-
- if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
- entry->counters.jit_optimization_count++;
- entry->counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);
-
- if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
- entry->counters.jit_emission_count++;
- entry->counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
- }
-
- /* parallel worker counters */
- entry->counters.parallel_workers_to_launch += parallel_workers_to_launch;
- entry->counters.parallel_workers_launched += parallel_workers_launched;
-
- /* plan cache counters */
- if (planOrigin == PLAN_STMT_CACHE_GENERIC)
- entry->counters.generic_plan_calls++;
- else if (planOrigin == PLAN_STMT_CACHE_CUSTOM)
- entry->counters.custom_plan_calls++;
-
- SpinLockRelease(&entry->mutex);
- }
+/* Number of output arguments (columns) for various API versions */
+#define PG_STAT_STATEMENTS_COLS_V1_0 14
+#define PG_STAT_STATEMENTS_COLS_V1_1 18
+#define PG_STAT_STATEMENTS_COLS_V1_2 19
+#define PG_STAT_STATEMENTS_COLS_V1_3 23
+#define PG_STAT_STATEMENTS_COLS_V1_8 32
+#define PG_STAT_STATEMENTS_COLS_V1_9 33
+#define PG_STAT_STATEMENTS_COLS_V1_10 43
+#define PG_STAT_STATEMENTS_COLS_V1_11 49
+#define PG_STAT_STATEMENTS_COLS_V1_12 52
+#define PG_STAT_STATEMENTS_COLS_V1_13 54
+#define PG_STAT_STATEMENTS_COLS 54 /* maximum of above */
-done:
- LWLockRelease(&pgss->lock.lock);
+/*
+ * Reset statement statistics.
+ */
+Datum
+pg_stat_statements_reset(PG_FUNCTION_ARGS)
+{
+ entry_reset(0, 0, 0, false);
- /* We postpone this clean-up until we're out of the lock */
- if (norm_query)
- pfree(norm_query);
+ PG_RETURN_VOID();
}
/*
@@ -1536,123 +1494,80 @@ pg_stat_statements_reset_1_11(PG_FUNCTION_ARGS)
PG_RETURN_TIMESTAMPTZ(entry_reset(userid, dbid, queryid, minmax_only));
}
-/*
- * Reset statement statistics.
- */
-Datum
-pg_stat_statements_reset(PG_FUNCTION_ARGS)
-{
- entry_reset(0, 0, 0, false);
-
- PG_RETURN_VOID();
-}
-
-/* Number of output arguments (columns) for various API versions */
-#define PG_STAT_STATEMENTS_COLS_V1_0 14
-#define PG_STAT_STATEMENTS_COLS_V1_1 18
-#define PG_STAT_STATEMENTS_COLS_V1_2 19
-#define PG_STAT_STATEMENTS_COLS_V1_3 23
-#define PG_STAT_STATEMENTS_COLS_V1_8 32
-#define PG_STAT_STATEMENTS_COLS_V1_9 33
-#define PG_STAT_STATEMENTS_COLS_V1_10 43
-#define PG_STAT_STATEMENTS_COLS_V1_11 49
-#define PG_STAT_STATEMENTS_COLS_V1_12 52
-#define PG_STAT_STATEMENTS_COLS_V1_13 54
-#define PG_STAT_STATEMENTS_COLS 54 /* maximum of above */
-
-/*
- * Retrieve statement statistics.
- *
- * The SQL API of this function has changed multiple times, and will likely
- * do so again in future. To support the case where a newer version of this
- * loadable module is being used with an old SQL declaration of the function,
- * we continue to support the older API versions. For 1.2 and later, the
- * expected API version is identified by embedding it in the C name of the
- * function. Unfortunately we weren't bright enough to do that for 1.1.
- */
Datum
-pg_stat_statements_1_13(PG_FUNCTION_ARGS)
+pg_stat_statements_1_2(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_13, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_2, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_12(PG_FUNCTION_ARGS)
+pg_stat_statements_1_3(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_12, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_3, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_11(PG_FUNCTION_ARGS)
+pg_stat_statements_1_8(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_11, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_8, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_10(PG_FUNCTION_ARGS)
+pg_stat_statements_1_9(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_10, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_9, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_9(PG_FUNCTION_ARGS)
+pg_stat_statements_1_10(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_9, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_10, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_8(PG_FUNCTION_ARGS)
+pg_stat_statements_1_11(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_8, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_11, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_3(PG_FUNCTION_ARGS)
+pg_stat_statements_1_12(PG_FUNCTION_ARGS)
{
bool showtext = PG_GETARG_BOOL(0);
- pg_stat_statements_internal(fcinfo, PGSS_V1_3, showtext);
-
+ pg_stat_statements_internal(fcinfo, PGSS_V1_12, showtext);
return (Datum) 0;
}
Datum
-pg_stat_statements_1_2(PG_FUNCTION_ARGS)
+pg_stat_statements_1_13(PG_FUNCTION_ARGS)
{
- bool showtext = PG_GETARG_BOOL(0);
-
- pg_stat_statements_internal(fcinfo, PGSS_V1_2, showtext);
+ bool showtext = PG_GETARG_BOOL(0);
+ pg_stat_statements_internal(fcinfo, PGSS_V1_13, showtext);
return (Datum) 0;
}
/*
* Legacy entry point for pg_stat_statements() API versions 1.0 and 1.1.
- * This can be removed someday, perhaps.
*/
Datum
pg_stat_statements(PG_FUNCTION_ARGS)
@@ -1663,33 +1578,31 @@ pg_stat_statements(PG_FUNCTION_ARGS)
return (Datum) 0;
}
-/* Common code for all versions of pg_stat_statements() */
+/*
+ * pg_stat_statements_internal
+ *
+ * Scan the dshash for all tracked entries, fetch their counters from
+ * pgstat, and return the combined result set. Version-aware column emission.
+ */
static void
pg_stat_statements_internal(FunctionCallInfo fcinfo,
pgssVersion api_version,
bool showtext)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ dshash_seq_status status;
+ pgssEntry *entry;
Oid userid = GetUserId();
- bool is_allowed_role = false;
+ bool is_allowed_role;
char *qbuffer = NULL;
Size qbuffer_size = 0;
- Size extent = 0;
- int gc_count = 0;
- HASH_SEQ_STATUS hash_seq;
- pgssEntry *entry;
- /*
- * Superusers or roles with the privileges of pg_read_all_stats members
- * are allowed
- */
is_allowed_role = has_privs_of_role(userid, ROLE_PG_READ_ALL_STATS);
- /* hash table must exist already */
- if (!pgss || !pgss_hash)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
+ pgss_attach_shmem();
+
+ /* Flush pending stats so we can read up-to-date counters */
+ pgstat_report_anytime_stat();
InitMaterializedSRF(fcinfo, 0);
@@ -1746,82 +1659,39 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
elog(ERROR, "incorrect number of output arguments");
}
- /*
- * We'd like to load the query text file (if needed) while not holding any
- * lock on pgss->lock. In the worst case we'll have to do this again
- * after we have the lock, but it's unlikely enough to make this a win
- * despite occasional duplicated work. We need to reload if anybody
- * writes to the file (either a retail qtext_store(), or a garbage
- * collection) between this point and where we've gotten shared lock. If
- * a qtext_store is actually in progress when we look, we might as well
- * skip the speculative load entirely.
- */
- if (showtext)
- {
- int n_writers;
-
- /* Take the mutex so we can examine variables */
- SpinLockAcquire(&pgss->mutex);
- extent = pgss->extent;
- n_writers = pgss->n_writers;
- gc_count = pgss->gc_count;
- SpinLockRelease(&pgss->mutex);
-
- /* No point in loading file now if there are active writers */
- if (n_writers == 0)
- qbuffer = qtext_load_file(&qbuffer_size);
- }
-
- /*
- * Get shared lock, load or reload the query text file if we must, and
- * iterate over the hashtable entries.
- *
- * With a large hash table, we might be holding the lock rather longer
- * than one could wish. However, this only blocks creation of new hash
- * table entries, and the larger the hash table the less likely that is to
- * be needed. So we can hope this is okay. Perhaps someday we'll decide
- * we need to partition the hash table to limit the time spent holding any
- * one lock.
- */
- LWLockAcquire(&pgss->lock.lock, LW_SHARED);
-
+ /* Load the query text file */
if (showtext)
- {
- /*
- * Here it is safe to examine extent and gc_count without taking the
- * mutex. Note that although other processes might change
- * pgss->extent just after we look at it, the strings they then write
- * into the file cannot yet be referenced in the hashtable, so we
- * don't care whether we see them or not.
- *
- * If qtext_load_file fails, we just press on; we'll return NULL for
- * every query text.
- */
- if (qbuffer == NULL ||
- pgss->extent != extent ||
- pgss->gc_count != gc_count)
- {
- if (qbuffer)
- pfree(qbuffer);
- qbuffer = qtext_load_file(&qbuffer_size);
- }
- }
+ qbuffer = qtext_load_file(&qbuffer_size);
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ dshash_seq_init(&status, pgss_hash, false);
+ while ((entry = dshash_seq_next(&status)) != NULL)
{
Datum values[PG_STAT_STATEMENTS_COLS];
bool nulls[PG_STAT_STATEMENTS_COLS];
int i = 0;
- Counters tmp;
+ uint64 objid;
+ pgssCounters *counters;
+ pgssCounters tmp;
double stddev;
- int64 queryid = entry->key.queryid;
- TimestampTz stats_since;
- TimestampTz minmax_stats_since;
+ bool may_free = false;
memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
+ /* Fetch counters from pgstat */
+ objid = pgss_hash_key((pgssHashKey *) &entry->key);
+ counters = (pgssCounters *)
+ pgstat_fetch_entry(PGSTAT_KIND_PGSS, entry->key.dbid, objid,
+ &may_free);
+
+ if (!counters)
+ continue;
+
+ tmp = *counters;
+ if (may_free)
+ pfree(counters);
+
+
values[i++] = ObjectIdGetDatum(entry->key.userid);
values[i++] = ObjectIdGetDatum(entry->key.dbid);
if (api_version >= PGSS_V1_9)
@@ -1830,7 +1700,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
if (is_allowed_role || entry->key.userid == userid)
{
if (api_version >= PGSS_V1_2)
- values[i++] = Int64GetDatumFast(queryid);
+ values[i++] = Int64GetDatumFast(entry->key.queryid);
if (showtext)
{
@@ -1841,62 +1711,30 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
if (qstr)
{
- char *enc;
-
- enc = pg_any_to_server(qstr,
- entry->query_len,
- entry->encoding);
+ char *enc = pg_any_to_server(qstr, entry->query_len, entry->encoding);
values[i++] = CStringGetTextDatum(enc);
-
if (enc != qstr)
pfree(enc);
}
else
- {
- /* Just return a null if we fail to find the text */
nulls[i++] = true;
- }
}
else
- {
- /* Query text not requested */
nulls[i++] = true;
- }
}
else
{
- /* Don't show queryid */
if (api_version >= PGSS_V1_2)
nulls[i++] = true;
- /*
- * Don't show query text, but hint as to the reason for not doing
- * so if it was requested
- */
if (showtext)
values[i++] = CStringGetTextDatum("<insufficient privilege>");
else
nulls[i++] = true;
}
- /* copy counters to a local variable to keep locking time short */
- SpinLockAcquire(&entry->mutex);
- tmp = entry->counters;
- SpinLockRelease(&entry->mutex);
-
- /*
- * The spinlock is not required when reading these two as they are
- * always updated when holding pgss->lock exclusively.
- */
- stats_since = entry->stats_since;
- minmax_stats_since = entry->minmax_stats_since;
-
- /* Skip entry if unexecuted (ie, it's a pending "sticky" entry) */
- if (IS_STICKY(tmp))
- continue;
-
- /* Note that we rely on PGSS_PLAN being 0 and PGSS_EXEC being 1. */
+ /* Note: PGSS_PLAN is 0, PGSS_EXEC is 1 */
for (int kind = 0; kind < PGSS_NUMKIND; kind++)
{
if (kind == PGSS_EXEC || api_version >= PGSS_V1_8)
@@ -1912,12 +1750,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
values[i++] = Float8GetDatumFast(tmp.max_time[kind]);
values[i++] = Float8GetDatumFast(tmp.mean_time[kind]);
- /*
- * Note we are calculating the population variance here, not
- * the sample variance, as we have data for the whole
- * population, so Bessel's correction is not used, and we
- * don't divide by tmp.calls - 1.
- */
if (tmp.calls[kind] > 1)
stddev = sqrt(tmp.sum_var_time[kind] / tmp.calls[kind]);
else
@@ -1925,6 +1757,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
values[i++] = Float8GetDatumFast(stddev);
}
}
+
values[i++] = Int64GetDatumFast(tmp.rows);
values[i++] = Int64GetDatumFast(tmp.shared_blks_hit);
values[i++] = Int64GetDatumFast(tmp.shared_blks_read);
@@ -1962,8 +1795,6 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
values[i++] = Int64GetDatumFast(tmp.wal_fpi);
snprintf(buf, sizeof buf, UINT64_FORMAT, tmp.wal_bytes);
-
- /* Convert to numeric. */
wal_bytes = DirectFunctionCall3(numeric_in,
CStringGetDatum(buf),
ObjectIdGetDatum(0),
@@ -1971,9 +1802,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
values[i++] = wal_bytes;
}
if (api_version >= PGSS_V1_12)
- {
values[i++] = Int64GetDatumFast(tmp.wal_buffers_full);
- }
if (api_version >= PGSS_V1_10)
{
values[i++] = Int64GetDatumFast(tmp.jit_functions);
@@ -2002,8 +1831,8 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
}
if (api_version >= PGSS_V1_11)
{
- values[i++] = TimestampTzGetDatum(stats_since);
- values[i++] = TimestampTzGetDatum(minmax_stats_since);
+ values[i++] = TimestampTzGetDatum(entry->stats_since);
+ values[i++] = TimestampTzGetDatum(entry->minmax_stats_since);
}
Assert(i == (api_version == PGSS_V1_0 ? PG_STAT_STATEMENTS_COLS_V1_0 :
@@ -2020,8 +1849,7 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
-
- LWLockRelease(&pgss->lock.lock);
+ dshash_seq_term(&status);
if (qbuffer)
pfree(qbuffer);
@@ -2036,196 +1864,296 @@ pg_stat_statements_internal(FunctionCallInfo fcinfo,
Datum
pg_stat_statements_info(PG_FUNCTION_ARGS)
{
- pgssGlobalStats stats;
TupleDesc tupdesc;
Datum values[PG_STAT_STATEMENTS_INFO_COLS] = {0};
bool nulls[PG_STAT_STATEMENTS_INFO_COLS] = {0};
- if (!pgss || !pgss_hash)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
-
- /* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
- /* Read global statistics for pg_stat_statements */
- SpinLockAcquire(&pgss->mutex);
- stats = pgss->stats;
- SpinLockRelease(&pgss->mutex);
+ pgss_attach_shmem();
- values[0] = Int64GetDatum(stats.dealloc);
- values[1] = TimestampTzGetDatum(stats.stats_reset);
+ values[0] = Int64GetDatum((int64) pg_atomic_read_u64(&pgss_shared->dealloc));
+ values[1] = TimestampTzGetDatum(pgss_shared->stats_reset);
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
/*
- * Allocate a new hashtable entry.
- * caller must hold an exclusive lock on pgss->lock
+ * Evict entries using clock-sweep with reference count decay.
*
- * "query" need not be null-terminated; we rely on query_len instead
+ * Multiple backends can sweep concurrently, each claiming the next partition
+ * via atomic increment of sweep_partition (rotating hand). Each backend
+ * sweeps one partition per invocation: decrementing refcounts and evicting
+ * entries that reach zero via dshash_delete_current. Entries still above 0
+ * survive. Hot queries that execute frequently keep their refcount topped
+ * up (capped at PGSS_REF_CAP), giving them proportional protection.
*
- * If "sticky" is true, make the new entry artificially sticky so that it will
- * probably still be there when the query finishes execution. We do this by
- * giving it a median usage value rather than the normal value. (Strictly
- * speaking, query strings are normalized on a best effort basis, though it
- * would be difficult to demonstrate this even under artificial conditions.)
+ * The hand advances across invocations so each sweep picks up where the last
+ * one left off, providing clock-sweep behavior. Eviction stops within a
+ * partition as soon as nentries drops back to pgss_max.
*
- * Note: despite needing exclusive lock, it's not an error for the target
- * entry to already exist. This is because pgss_store releases and
- * reacquires lock after failing to find a match; so someone else could
- * have made the entry while we waited to get exclusive lock.
+ * Note: the partition swept is determined by the rotating hand, not by the
+ * key just inserted. Eviction is a continuous background pressure mechanism
+ * to keep total entries near pgss_max, not a targeted operation to make room
+ * for the current entry specifically.
+ *
+ * Guaranteed eviction: any entry that stops being accessed will reach 0
+ * after at most PGSS_REF_CAP sweeps.
*/
-static pgssEntry *
-entry_alloc(pgssHashKey *key, Size query_offset, int query_len, int encoding,
- bool sticky)
+static void
+entry_dealloc(void)
{
+ dshash_seq_status status;
pgssEntry *entry;
- bool found;
+ int nentries;
+ int evicted = 0;
+ uint32 my_partition;
+ uint32 sweep_val;
+ uint32 cur;
+
+ nentries = (int) pg_atomic_read_u64(&pgss_shared->nentries);
+ if (nentries <= pgss_max)
+ return;
- /* Make space if needed */
- while (hash_get_num_entries(pgss_hash) >= pgss_max)
- entry_dealloc();
+ /*
+ * Claim the next partition by atomically advancing the hand. Multiple
+ * backends can sweep concurrently, each on a different partition.
+ */
+ sweep_val = pg_atomic_fetch_add_u32(&pgss_shared->sweep_partition, 1);
+ my_partition = sweep_val % DSHASH_NUM_PARTITIONS;
- /* Find or create an entry with desired hash code */
- entry = (pgssEntry *) hash_search(pgss_hash, key, HASH_ENTER, &found);
+ /* Re-check: another backend may have already resolved the overshoot */
+ if ((int) pg_atomic_read_u64(&pgss_shared->nentries) <= pgss_max)
+ return;
- if (!found)
+ /*
+ * Sweep one partition: decrement each entry's refcount, evict entries
+ * that reach zero. Stop if nentries drops back to pgss_max.
+ */
+ dshash_seq_init_partition(&status, pgss_hash, true, my_partition);
+ while ((entry = dshash_seq_next(&status)) != NULL)
{
- /* New entry, initialize it */
-
- /* reset the statistics */
- memset(&entry->counters, 0, sizeof(Counters));
- /* set the appropriate initial usage count */
- entry->counters.usage = sticky ? pgss->cur_median_usage : USAGE_INIT;
- /* re-initialize the mutex each time ... we assume no one using it */
- SpinLockInit(&entry->mutex);
- /* ... and don't forget the query text metadata */
- Assert(query_len >= 0);
- entry->query_offset = query_offset;
- entry->query_len = query_len;
- entry->encoding = encoding;
- entry->stats_since = GetCurrentTimestamp();
- entry->minmax_stats_since = entry->stats_since;
- }
+ if ((int) pg_atomic_read_u64(&pgss_shared->nentries) <= pgss_max * (100 - USAGE_DEALLOC_PERCENT) / 100)
+ break;
- return entry;
-}
+ /*
+ * Decrement refcount; evict if it reaches zero. Guard against
+ * underflow — a prior sweep may have already decremented to zero
+ * without evicting (because nentries dropped below threshold). No CAS
+ * needed: we hold the partition lock exclusively.
+ */
+ cur = pg_atomic_read_u32(&entry->refcount);
+ if (cur > 0)
+ {
+ pg_atomic_write_u32(&entry->refcount, cur - 1);
+ if (cur - 1 > 0)
+ continue;
+ }
-/*
- * qsort comparator for sorting into increasing usage order
- */
-static int
-entry_cmp(const void *lhs, const void *rhs)
-{
- double l_usage = (*(pgssEntry *const *) lhs)->counters.usage;
- double r_usage = (*(pgssEntry *const *) rhs)->counters.usage;
+ /* Evict: drop from pgstat, then delete from dshash */
+ pgstat_drop_entry(PGSTAT_KIND_PGSS, entry->key.dbid,
+ pgss_hash_key(&entry->key), true);
- if (l_usage < r_usage)
- return -1;
- else if (l_usage > r_usage)
- return +1;
- else
- return 0;
+ dshash_delete_current(&status);
+ pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
+ evicted++;
+ }
+ dshash_seq_term(&status);
+
+ if (evicted > 0)
+ {
+ pg_atomic_fetch_add_u64(&pgss_shared->dealloc, evicted);
+
+ /* Request pgstat entry ref GC once per full rotation */
+ if (my_partition == DSHASH_NUM_PARTITIONS - 1)
+ pgstat_request_entry_refs_gc();
+ }
}
/*
- * Deallocate least-used entries.
- *
- * Caller must hold an exclusive lock on pgss->lock.
+ * Reset entries corresponding to parameters passed.
+ * Iterates the dshash to find matching entries, deletes them from the
+ * dshash, and drops their pgstat entries.
*/
-static void
-entry_dealloc(void)
+static TimestampTz
+entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
{
- HASH_SEQ_STATUS hash_seq;
- pgssEntry **entries;
+ dshash_seq_status status;
pgssEntry *entry;
- int nvictims;
- int i;
- Size tottextlen;
- int nvalidtexts;
+ TimestampTz stats_reset;
+ int64 num_entries;
+ int64 num_remove = 0;
- /*
- * Sort entries by usage and deallocate USAGE_DEALLOC_PERCENT of them.
- * While we're scanning the table, apply the decay factor to the usage
- * values, and update the mean query length.
- *
- * Note that the mean query length is almost immediately obsolete, since
- * we compute it before not after discarding the least-used entries.
- * Hopefully, that doesn't affect the mean too much; it doesn't seem worth
- * making two passes to get a more current result. Likewise, the new
- * cur_median_usage includes the entries we're about to zap.
- */
+ pgss_attach_shmem();
- entries = palloc(hash_get_num_entries(pgss_hash) * sizeof(pgssEntry *));
+ /* Flush pending stats before reset so we don't re-create dropped entries */
+ pgstat_report_anytime_stat();
- i = 0;
- tottextlen = 0;
- nvalidtexts = 0;
+ stats_reset = GetCurrentTimestamp();
+ num_entries = (int64) pg_atomic_read_u64(&pgss_shared->nentries);
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ if (minmax_only)
{
- entries[i++] = entry;
- /* "Sticky" entries get a different usage decay rate. */
- if (IS_STICKY(entry->counters))
- entry->counters.usage *= STICKY_DECREASE_FACTOR;
- else
- entry->counters.usage *= USAGE_DECREASE_FACTOR;
- /* In the mean length computation, ignore dropped texts. */
- if (entry->query_len >= 0)
+ /*
+ * Reset only min/max timing values and update minmax_stats_since.
+ * Iterate matching entries in the dshash, reset the corresponding
+ * pgstat counters' min/max fields.
+ */
+ dshash_seq_init(&status, pgss_hash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
{
- tottextlen += entry->query_len + 1;
- nvalidtexts++;
+ if ((!userid || entry->key.userid == userid) &&
+ (!dbid || entry->key.dbid == dbid) &&
+ (!queryid || entry->key.queryid == queryid))
+ {
+ uint64 objid = pgss_hash_key(&entry->key);
+ PgStat_EntryRef *entry_ref;
+
+ entry->minmax_stats_since = stats_reset;
+
+ entry_ref = pgstat_get_entry_ref(PGSTAT_KIND_PGSS,
+ entry->key.dbid, objid,
+ false, NULL);
+ if (entry_ref)
+ {
+ PgStatShared_Pgss *shared;
+
+ shared = (PgStatShared_Pgss *) entry_ref->shared_stats;
+ if (!pgstat_lock_entry(entry_ref, false))
+ continue;
+
+ for (int kind = 0; kind < PGSS_NUMKIND; kind++)
+ {
+ shared->counters.min_time[kind] = 0;
+ shared->counters.max_time[kind] = 0;
+ shared->counters.mean_time[kind] = 0;
+ shared->counters.sum_var_time[kind] = 0;
+ }
+
+ pgstat_unlock_entry(entry_ref);
+ }
+ }
}
+ dshash_seq_term(&status);
+
+ return stats_reset;
}
- /* Sort into increasing order by usage */
- qsort(entries, i, sizeof(pgssEntry *), entry_cmp);
+ if (userid != 0 && dbid != 0 && queryid != INT64CONST(0))
+ {
+ /*
+ * Fast path: specific entry identified by all three key components.
+ * Try both toplevel=true and toplevel=false.
+ */
+ pgssHashKey key;
- /* Record the (approximate) median usage */
- if (i > 0)
- pgss->cur_median_usage = entries[i / 2]->counters.usage;
- /* Record the mean query length */
- if (nvalidtexts > 0)
- pgss->mean_query_len = tottextlen / nvalidtexts;
- else
- pgss->mean_query_len = ASSUMED_LENGTH_INIT;
+ memset(&key, 0, sizeof(pgssHashKey));
+ key.userid = userid;
+ key.dbid = dbid;
+ key.queryid = queryid;
+
+ key.toplevel = false;
+ entry = dshash_find(pgss_hash, &key, true);
+ if (entry)
+ {
+ uint64 objid = pgss_hash_key(&key);
+
+ dshash_delete_entry(pgss_hash, entry);
+ pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
+ num_remove++;
- /* Now zap an appropriate fraction of lowest-usage entries */
- nvictims = Max(10, i * USAGE_DEALLOC_PERCENT / 100);
- nvictims = Min(nvictims, i);
+ if (!pgstat_drop_entry(PGSTAT_KIND_PGSS, key.dbid, objid, true))
+ pgstat_request_entry_refs_gc();
+ }
+
+ key.toplevel = true;
+ entry = dshash_find(pgss_hash, &key, true);
+ if (entry)
+ {
+ uint64 objid = pgss_hash_key(&key);
- for (i = 0; i < nvictims; i++)
+ dshash_delete_entry(pgss_hash, entry);
+ pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
+ num_remove++;
+
+ if (!pgstat_drop_entry(PGSTAT_KIND_PGSS, key.dbid, objid, true))
+ pgstat_request_entry_refs_gc();
+ }
+ }
+ else
{
- hash_search(pgss_hash, &entries[i]->key, HASH_REMOVE, NULL);
+ /* Iterate all entries and remove those matching the filter */
+ dshash_seq_init(&status, pgss_hash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
+ {
+ if ((!userid || entry->key.userid == userid) &&
+ (!dbid || entry->key.dbid == dbid) &&
+ (!queryid || entry->key.queryid == queryid))
+ {
+ pgssHashKey ekey = entry->key;
+ uint64 objid = pgss_hash_key(&ekey);
+
+ dshash_delete_current(&status);
+ pg_atomic_fetch_sub_u64(&pgss_shared->nentries, 1);
+ num_remove++;
+
+ if (!pgstat_drop_entry(PGSTAT_KIND_PGSS, ekey.dbid, objid, true))
+ pgstat_request_entry_refs_gc();
+ }
+ }
+ dshash_seq_term(&status);
}
- pfree(entries);
+ /* If all entries were removed, reset global statistics and text file */
+ if (num_remove > 0 && num_entries == num_remove)
+ {
+ FILE *qfile;
+
+ pg_atomic_write_u64(&pgss_shared->dealloc, 0);
+ pgss_shared->stats_reset = stats_reset;
+
+ /* Write new empty query file */
+ qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
+ if (qfile == NULL)
+ {
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ PGSS_TEXT_FILE)));
+ }
+ else
+ {
+ if (ftruncate(fileno(qfile), 0) != 0)
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not truncate file \"%s\": %m",
+ PGSS_TEXT_FILE)));
+ FreeFile(qfile);
+ }
+
+ SpinLockAcquire(&pgss_shared->mutex);
+ pgss_shared->extent = 0;
+ SpinLockRelease(&pgss_shared->mutex);
+ record_gc_qtexts();
+ }
- /* Increment the number of times entries are deallocated */
- SpinLockAcquire(&pgss->mutex);
- pgss->stats.dealloc += 1;
- SpinLockRelease(&pgss->mutex);
+ return stats_reset;
}
+/*--------------------------------------------------------------------------
+ * Query text file management
+ *--------------------------------------------------------------------------
+ */
+
/*
- * Given a query string (not necessarily null-terminated), allocate a new
- * entry in the external query text file and store the string there.
+ * Store query text in the external file.
*
* If successful, returns true, and stores the new entry's offset in the file
* into *query_offset. Also, if gc_count isn't NULL, *gc_count is set to the
* number of garbage collections that have occurred so far.
*
* On failure, returns false.
- *
- * At least a shared lock on pgss->lock must be held by the caller, so as
- * to prevent a concurrent garbage collection. Share-lock-holding callers
- * should pass a gc_count pointer to obtain the number of garbage collections,
- * so that they can recheck the count after obtaining exclusive lock to
- * detect whether a garbage collection occurred (and removed this entry).
*/
static bool
qtext_store(const char *query, int query_len,
@@ -2238,13 +2166,13 @@ qtext_store(const char *query, int query_len,
* We use a spinlock to protect extent/n_writers/gc_count, so that
* multiple processes may execute this function concurrently.
*/
- SpinLockAcquire(&pgss->mutex);
- off = pgss->extent;
- pgss->extent += query_len + 1;
- pgss->n_writers++;
+ SpinLockAcquire(&pgss_shared->mutex);
+ off = pgss_shared->extent;
+ pgss_shared->extent += query_len + 1;
+ pgss_shared->n_writers++;
if (gc_count)
- *gc_count = pgss->gc_count;
- SpinLockRelease(&pgss->mutex);
+ *gc_count = pgss_shared->gc_count;
+ SpinLockRelease(&pgss_shared->mutex);
*query_offset = off;
@@ -2255,7 +2183,7 @@ qtext_store(const char *query, int query_len,
*/
if (unlikely(query_len >= MaxAllocHugeSize - off))
{
- errno = EFBIG; /* not quite right, but it'll do */
+ errno = EFBIG;
fd = -1;
goto error;
}
@@ -2273,9 +2201,9 @@ qtext_store(const char *query, int query_len,
CloseTransientFile(fd);
/* Mark our write complete */
- SpinLockAcquire(&pgss->mutex);
- pgss->n_writers--;
- SpinLockRelease(&pgss->mutex);
+ SpinLockAcquire(&pgss_shared->mutex);
+ pgss_shared->n_writers--;
+ SpinLockRelease(&pgss_shared->mutex);
return true;
@@ -2289,9 +2217,9 @@ error:
CloseTransientFile(fd);
/* Mark our write complete */
- SpinLockAcquire(&pgss->mutex);
- pgss->n_writers--;
- SpinLockRelease(&pgss->mutex);
+ SpinLockAcquire(&pgss_shared->mutex);
+ pgss_shared->n_writers--;
+ SpinLockRelease(&pgss_shared->mutex);
return false;
}
@@ -2303,9 +2231,6 @@ error:
* file not there or insufficient memory.
*
* On success, the buffer size is also returned into *buffer_size.
- *
- * This can be called without any lock on pgss->lock, but in that case
- * the caller is responsible for verifying that the result is sane.
*/
static char *
qtext_load_file(Size *buffer_size)
@@ -2354,22 +2279,14 @@ qtext_load_file(Size *buffer_size)
}
/*
- * OK, slurp in the file. Windows fails if we try to read more than
- * INT_MAX bytes at once, and other platforms might not like that either,
- * so read a very large file in 1GB segments.
+ * OK, slurp in the file. Read in 1GB segments to avoid issues on some
+ * platforms.
*/
nread = 0;
while (nread < stat.st_size)
{
int toread = Min(1024 * 1024 * 1024, stat.st_size - nread);
- /*
- * If we get a short read and errno doesn't get set, the reason is
- * probably that garbage collection truncated the file since we did
- * the fstat(), so we don't log a complaint --- but we don't return
- * the data, either, since it's most likely corrupt due to concurrent
- * writes from garbage collection.
- */
errno = 0;
if (read(fd, buf + nread, toread) != toread)
{
@@ -2421,37 +2338,39 @@ qtext_fetch(Size query_offset, int query_len,
/*
* Do we need to garbage-collect the external query text file?
*
- * Caller should hold at least a shared lock on pgss->lock.
+ * We check whether the file has grown excessively relative to the number
+ * of entries tracked.
*/
static bool
need_gc_qtexts(void)
{
Size extent;
+ int nentries;
/* Read shared extent pointer */
- SpinLockAcquire(&pgss->mutex);
- extent = pgss->extent;
- SpinLockRelease(&pgss->mutex);
+ SpinLockAcquire(&pgss_shared->mutex);
+ extent = pgss_shared->extent;
+ SpinLockRelease(&pgss_shared->mutex);
+
+ nentries = (int) pg_atomic_read_u64(&pgss_shared->nentries);
/*
* Don't proceed if file does not exceed 512 bytes per possible entry.
- *
- * Here and in the next test, 32-bit machines have overflow hazards if
- * pgss_max and/or mean_query_len are large. Force the multiplications
- * and comparisons to be done in uint64 arithmetic to forestall trouble.
*/
if ((uint64) extent < (uint64) 512 * pgss_max)
return false;
/*
- * Don't proceed if file is less than about 50% bloat. Nothing can or
- * should be done in the event of unusually large query texts accounting
- * for file's large size. We go to the trouble of maintaining the mean
- * query length in order to prevent garbage collection from thrashing
- * uselessly.
+ * Don't proceed if file is less than about 50% bloat. We estimate mean
+ * query length from the file size and entry count.
*/
- if ((uint64) extent < (uint64) pgss->mean_query_len * pgss_max * 2)
- return false;
+ if (nentries > 0)
+ {
+ Size mean_query_len = extent / nentries;
+
+ if ((uint64) extent < (uint64) mean_query_len * pgss_max * 2)
+ return false;
+ }
return true;
}
@@ -2459,18 +2378,14 @@ need_gc_qtexts(void)
/*
* Garbage-collect orphaned query texts in external file.
*
- * This won't be called often in the typical case, since it's likely that
- * there won't be too much churn, and besides, a similar compaction process
- * occurs when serializing to disk at shutdown or as part of resetting.
- * Despite this, it seems prudent to plan for the edge case where the file
- * becomes unreasonably large, with no other method of compaction likely to
- * occur in the foreseeable future.
+ * This rewrites the query text file, keeping only texts referenced by
+ * current dshash entries, and updates their offsets accordingly.
*
- * The caller must hold an exclusive lock on pgss->lock.
- *
- * At the first sign of trouble we unlink the query text file to get a clean
- * slate (although existing statistics are retained), rather than risk
- * thrashing by allowing the same problem case to recur indefinitely.
+ * Note: unlike the upstream implementation which required LWLock exclusive,
+ * this uses dshash partition-level exclusive locks via sequential scan with
+ * exclusive=true. This is safe because we're updating entry offsets in-place
+ * and the file is guaranteed not to grow during GC (no new writers can get
+ * offsets into the old region after we've reset the extent).
*/
static void
gc_qtexts(void)
@@ -2478,25 +2393,16 @@ gc_qtexts(void)
char *qbuffer;
Size qbuffer_size;
FILE *qfile = NULL;
- HASH_SEQ_STATUS hash_seq;
+ dshash_seq_status status;
pgssEntry *entry;
Size extent;
- int nentries;
- /*
- * When called from pgss_store, some other session might have proceeded
- * with garbage collection in the no-lock-held interim of lock strength
- * escalation. Check once more that this is actually necessary.
- */
if (!need_gc_qtexts())
return;
/*
* Load the old texts file. If we fail (out of memory, for instance),
- * invalidate query texts. Hopefully this is rare. It might seem better
- * to leave things alone on an OOM failure, but the problem is that the
- * file is only going to get bigger; hoping for a future non-OOM result is
- * risky and can easily lead to complete denial of service.
+ * invalidate query texts.
*/
qbuffer = qtext_load_file(&qbuffer_size);
if (qbuffer == NULL)
@@ -2504,9 +2410,7 @@ gc_qtexts(void)
/*
* We overwrite the query texts file in place, so as to reduce the risk of
- * an out-of-disk-space failure. Since the file is guaranteed not to get
- * larger, this should always work on traditional filesystems; though we
- * could still lose on copy-on-write filesystems.
+ * an out-of-disk-space failure.
*/
qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
if (qfile == NULL)
@@ -2519,10 +2423,9 @@ gc_qtexts(void)
}
extent = 0;
- nentries = 0;
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ dshash_seq_init(&status, pgss_hash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
{
int query_len = entry->query_len;
char *qry = qtext_fetch(entry->query_offset,
@@ -2535,7 +2438,6 @@ gc_qtexts(void)
/* Trouble ... drop the text */
entry->query_offset = 0;
entry->query_len = -1;
- /* entry will not be counted in mean query length computation */
continue;
}
@@ -2545,18 +2447,17 @@ gc_qtexts(void)
(errcode_for_file_access(),
errmsg("could not write file \"%s\": %m",
PGSS_TEXT_FILE)));
- hash_seq_term(&hash_seq);
+ dshash_seq_term(&status);
goto gc_fail;
}
entry->query_offset = extent;
extent += query_len + 1;
- nentries++;
}
+ dshash_seq_term(&status);
/*
- * Truncate away any now-unused space. If this fails for some odd reason,
- * we log it, but there's no need to fail.
+ * Truncate away any now-unused space.
*/
if (ftruncate(fileno(qfile), extent) != 0)
ereport(LOG,
@@ -2575,29 +2476,15 @@ gc_qtexts(void)
}
elog(DEBUG1, "pgss gc of queries file shrunk size from %zu to %zu",
- pgss->extent, extent);
+ pgss_shared->extent, extent);
/* Reset the shared extent pointer */
- pgss->extent = extent;
-
- /*
- * Also update the mean query length, to be sure that need_gc_qtexts()
- * won't still think we have a problem.
- */
- if (nentries > 0)
- pgss->mean_query_len = extent / nentries;
- else
- pgss->mean_query_len = ASSUMED_LENGTH_INIT;
+ SpinLockAcquire(&pgss_shared->mutex);
+ pgss_shared->extent = extent;
+ SpinLockRelease(&pgss_shared->mutex);
pfree(qbuffer);
- /*
- * OK, count a garbage collection cycle. (Note: even though we have
- * exclusive lock on pgss->lock, we must take pgss->mutex for this, since
- * other processes may examine gc_count while holding only the mutex.
- * Also, we have to advance the count *after* we've rewritten the file,
- * else other processes might not realize they read a stale file.)
- */
record_gc_qtexts();
return;
@@ -2611,14 +2498,15 @@ gc_fail:
/*
* Since the contents of the external file are now uncertain, mark all
- * hashtable entries as having invalid texts.
+ * dshash entries as having invalid texts.
*/
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ dshash_seq_init(&status, pgss_hash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
{
entry->query_offset = 0;
entry->query_len = -1;
}
+ dshash_seq_term(&status);
/*
* Destroy the query text file and create a new, empty one
@@ -2634,160 +2522,11 @@ gc_fail:
FreeFile(qfile);
/* Reset the shared extent pointer */
- pgss->extent = 0;
-
- /* Reset mean_query_len to match the new state */
- pgss->mean_query_len = ASSUMED_LENGTH_INIT;
-
- /*
- * Bump the GC count even though we failed.
- *
- * This is needed to make concurrent readers of file without any lock on
- * pgss->lock notice existence of new version of file. Once readers
- * subsequently observe a change in GC count with pgss->lock held, that
- * forces a safe reopen of file. Writers also require that we bump here,
- * of course. (As required by locking protocol, readers and writers don't
- * trust earlier file contents until gc_count is found unchanged after
- * pgss->lock acquired in shared or exclusive mode respectively.)
- */
- record_gc_qtexts();
-}
-
-#define SINGLE_ENTRY_RESET(e) \
-if (e) { \
- if (minmax_only) { \
- /* When requested reset only min/max statistics of an entry */ \
- for (int kind = 0; kind < PGSS_NUMKIND; kind++) \
- { \
- e->counters.max_time[kind] = 0; \
- e->counters.min_time[kind] = 0; \
- } \
- e->minmax_stats_since = stats_reset; \
- } \
- else \
- { \
- /* Remove the key otherwise */ \
- hash_search(pgss_hash, &e->key, HASH_REMOVE, NULL); \
- num_remove++; \
- } \
-}
-
-/*
- * Reset entries corresponding to parameters passed.
- */
-static TimestampTz
-entry_reset(Oid userid, Oid dbid, int64 queryid, bool minmax_only)
-{
- HASH_SEQ_STATUS hash_seq;
- pgssEntry *entry;
- FILE *qfile;
- int64 num_entries;
- int64 num_remove = 0;
- pgssHashKey key;
- TimestampTz stats_reset;
-
- if (!pgss || !pgss_hash)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("pg_stat_statements must be loaded via \"shared_preload_libraries\"")));
-
- LWLockAcquire(&pgss->lock.lock, LW_EXCLUSIVE);
- num_entries = hash_get_num_entries(pgss_hash);
-
- stats_reset = GetCurrentTimestamp();
-
- if (userid != 0 && dbid != 0 && queryid != INT64CONST(0))
- {
- /* If all the parameters are available, use the fast path. */
- memset(&key, 0, sizeof(pgssHashKey));
- key.userid = userid;
- key.dbid = dbid;
- key.queryid = queryid;
-
- /*
- * Reset the entry if it exists, starting with the non-top-level
- * entry.
- */
- key.toplevel = false;
- entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
- SINGLE_ENTRY_RESET(entry);
-
- /* Also reset the top-level entry if it exists. */
- key.toplevel = true;
- entry = (pgssEntry *) hash_search(pgss_hash, &key, HASH_FIND, NULL);
-
- SINGLE_ENTRY_RESET(entry);
- }
- else if (userid != 0 || dbid != 0 || queryid != INT64CONST(0))
- {
- /* Reset entries corresponding to valid parameters. */
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
- {
- if ((!userid || entry->key.userid == userid) &&
- (!dbid || entry->key.dbid == dbid) &&
- (!queryid || entry->key.queryid == queryid))
- {
- SINGLE_ENTRY_RESET(entry);
- }
- }
- }
- else
- {
- /* Reset all entries. */
- hash_seq_init(&hash_seq, pgss_hash);
- while ((entry = hash_seq_search(&hash_seq)) != NULL)
- {
- SINGLE_ENTRY_RESET(entry);
- }
- }
-
- /* All entries are removed? */
- if (num_entries != num_remove)
- goto release_lock;
-
- /*
- * Reset global statistics for pg_stat_statements since all entries are
- * removed.
- */
- SpinLockAcquire(&pgss->mutex);
- pgss->stats.dealloc = 0;
- pgss->stats.stats_reset = stats_reset;
- SpinLockRelease(&pgss->mutex);
-
- /*
- * Write new empty query file, perhaps even creating a new one to recover
- * if the file was missing.
- */
- qfile = AllocateFile(PGSS_TEXT_FILE, PG_BINARY_W);
- if (qfile == NULL)
- {
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not create file \"%s\": %m",
- PGSS_TEXT_FILE)));
- goto done;
- }
-
- /* If ftruncate fails, log it, but it's not a fatal problem */
- if (ftruncate(fileno(qfile), 0) != 0)
- ereport(LOG,
- (errcode_for_file_access(),
- errmsg("could not truncate file \"%s\": %m",
- PGSS_TEXT_FILE)));
+ SpinLockAcquire(&pgss_shared->mutex);
+ pgss_shared->extent = 0;
+ SpinLockRelease(&pgss_shared->mutex);
- FreeFile(qfile);
-
-done:
- pgss->extent = 0;
- /* This counts as a query text garbage collection for our purposes */
record_gc_qtexts();
-
-release_lock:
- LWLockRelease(&pgss->lock.lock);
-
- return stats_reset;
}
/*
diff --git a/contrib/pg_stat_statements/pg_stat_statements.conf b/contrib/pg_stat_statements/pg_stat_statements.conf
index 0e900d7119b..21a10c41d09 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.conf
+++ b/contrib/pg_stat_statements/pg_stat_statements.conf
@@ -1,2 +1,3 @@
shared_preload_libraries = 'pg_stat_statements'
max_prepared_transactions = 5
+max_parallel_workers_per_gather = 0
diff --git a/contrib/pg_stat_statements/pg_stat_statements.control b/contrib/pg_stat_statements/pg_stat_statements.control
index 2eee0ceffa8..61ae41efc14 100644
--- a/contrib/pg_stat_statements/pg_stat_statements.control
+++ b/contrib/pg_stat_statements/pg_stat_statements.control
@@ -1,5 +1,5 @@
# pg_stat_statements extension
comment = 'track planning and execution statistics of all SQL statements executed'
-default_version = '1.13'
+default_version = '1.14'
module_pathname = '$libdir/pg_stat_statements'
relocatable = true
diff --git a/contrib/pg_stat_statements/sql/oldextversions.sql b/contrib/pg_stat_statements/sql/oldextversions.sql
index e416efe9ffb..b2090be6267 100644
--- a/contrib/pg_stat_statements/sql/oldextversions.sql
+++ b/contrib/pg_stat_statements/sql/oldextversions.sql
@@ -68,4 +68,9 @@ AlTER EXTENSION pg_stat_statements UPDATE TO '1.13';
\d pg_stat_statements
SELECT count(*) > 0 AS has_data FROM pg_stat_statements;
+-- Functions marked PARALLEL RESTRICTED in 1.14
+AlTER EXTENSION pg_stat_statements UPDATE TO '1.14';
+\d pg_stat_statements
+SELECT count(*) > 0 AS has_data FROM pg_stat_statements;
+
DROP EXTENSION pg_stat_statements;
diff --git a/doc/src/sgml/pgstatstatements.sgml b/doc/src/sgml/pgstatstatements.sgml
index d753de5836e..8470250ff66 100644
--- a/doc/src/sgml/pgstatstatements.sgml
+++ b/doc/src/sgml/pgstatstatements.sgml
@@ -16,12 +16,13 @@
<para>
The module must be loaded by adding <literal>pg_stat_statements</literal> to
<xref linkend="guc-shared-preload-libraries"/> in
- <filename>postgresql.conf</filename>, because it requires additional shared memory.
- This means that a server restart is needed to add or remove the module.
- In addition, query identifier calculation must be enabled in order for the
- module to be active, which is done automatically if <xref linkend="guc-compute-query-id"/>
- is set to <literal>auto</literal> or <literal>on</literal>, or any third-party
- module that calculates query identifiers is loaded.
+ <filename>postgresql.conf</filename>, because it must register hooks and a
+ custom statistics kind at server start. This means that a server restart is
+ needed to add or remove the module. In addition, query identifier calculation
+ must be enabled in order for the module to be active, which is done automatically
+ if <xref linkend="guc-compute-query-id"/> is set to <literal>auto</literal> or
+ <literal>on</literal>, or any third-party module that calculates query identifiers
+ is loaded.
</para>
<para>
@@ -794,10 +795,12 @@ calls | 2
<structfield>dealloc</structfield> <type>bigint</type>
</para>
<para>
- Total number of times <structname>pg_stat_statements</structname>
- entries about the least-executed statements were deallocated
- because more distinct statements than
- <varname>pg_stat_statements.max</varname> were observed
+ Total number of <structname>pg_stat_statements</structname>
+ entries evicted because more distinct statements than
+ <varname>pg_stat_statements.max</varname> were observed.
+ A high value relative to <varname>pg_stat_statements.max</varname>
+ indicates significant query churn and that
+ <varname>pg_stat_statements.max</varname> should be increased
</para></entry>
</row>
<row>
@@ -910,12 +913,15 @@ calls | 2
<varname>pg_stat_statements.max</varname> is the maximum number of
statements tracked by the module (i.e., the maximum number of rows
in the <structname>pg_stat_statements</structname> view). If more distinct
- statements than that are observed, information about the least-executed
+ statements than that are observed, information about the least-recently-used
statements is discarded. The number of times such information was
discarded can be seen in the
<structname>pg_stat_statements_info</structname> view.
+ This is a soft limit; the actual number of tracked statements may
+ briefly exceed it until eviction reclaims space.
The default value is 5000.
- This parameter can only be set at server start.
+ This parameter can only be set in the <filename>postgresql.conf</filename>
+ file or on the server command line.
</para>
</listitem>
</varlistentry>
@@ -1007,10 +1013,11 @@ calls | 2
</variablelist>
<para>
- The module requires additional shared memory proportional to
- <varname>pg_stat_statements.max</varname>. Note that this
- memory is consumed whenever the module is loaded, even if
- <varname>pg_stat_statements.track</varname> is set to <literal>none</literal>.
+ The module uses dynamic shared memory that grows as statements are
+ tracked, up to the limit set by
+ <varname>pg_stat_statements.max</varname>. Note that this memory is
+ not reclaimed when entries are deallocated; it is reused for new
+ entries but the overall shared memory footprint does not shrink.
</para>
<para>
--
2.50.1 (Apple Git-155)
view thread (12+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Improve pg_stat_statements scalability
In-Reply-To: <CAA5RZ0uoxiQ2_=xHGRnyc4WdM9aR0fzdMhBubnw97po==--yGQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox