public inbox for [email protected]help / color / mirror / Atom feed
Re: Comments on Custom RMGRs 12+ messages / 6 participants [nested] [flat]
* Re: Comments on Custom RMGRs @ 2024-03-21 12:47 Danil Anisimow <[email protected]> 0 siblings, 2 replies; 12+ messages in thread From: Danil Anisimow @ 2024-03-21 12:47 UTC (permalink / raw) To: Jeff Davis <[email protected]>; +Cc: Andres Freund <[email protected]>; pgsql-hackers On Fri, Mar 1, 2024 at 2:06 AM Jeff Davis <[email protected]> wrote: > Added to March CF. > > I don't have an immediate use case in mind for this, so please drive > that part of the discussion. I can't promise this for 17, but if the > patch is simple enough and a quick consensus develops, then it's > possible. [pgss_001.v1.patch] adds a custom resource manager to the pg_stat_statements extension. The proposed patch is not a complete solution for pgss and may not work correctly with replication. The 020_crash.pl test demonstrates server interruption by killing a backend. Without rm_checkpoint hook, the server restores pgss stats only after last CHECKPOINT. Data added to WAL before the checkpoint is not restored. The rm_checkpoint hook allows saving shared memory data to disk at each checkpoint. However, for pg_stat_statements, it matters when the checkpoint occurred. When the server shuts down, pgss deletes the temporary file of query texts. In other cases, this is unacceptable. To provide this capability, a flags parameter was added to the rm_checkpoint hook. The changes are presented in [rmgr_003.v2.patch]. -- Regards, Daniil Anisimov Postgres Professional: http://postgrespro.com Attachments: [text/x-patch] rmgr_003.v2.patch (9.0K, 3-rmgr_003.v2.patch) download | inline diff: diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 3e2f1d4a23..ad0a1d5134 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -44,8 +44,8 @@ /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ - { name, redo, desc, identify, startup, cleanup, mask, decode }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ + { name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint }, RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" @@ -83,6 +83,22 @@ RmgrCleanup(void) } } +/* + * Checkpoint all resource managers. + */ +void +RmgrCheckpoint(int flags) +{ + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + + if (RmgrTable[rmid].rm_checkpoint != NULL) + RmgrTable[rmid].rm_checkpoint(flags); + } +} + /* * Emit ERROR when we encounter a record with an RmgrId we don't * recognize. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..d21bf8ae24 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7357,6 +7357,9 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointSUBTRANS(); CheckPointMultiXact(); CheckPointPredicate(); + + RmgrCheckpoint(flags); + CheckPointBuffers(flags); /* Perform all queued up fsyncs */ diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 22f7351fdc..11ae1e7af4 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of the built-in resource manager names, to make error * messages a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ name, static const char *const RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..2bb5ba8c9f 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ { name, desc, identify}, static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index 3b6a497e1b..34ddc0210c 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 78e6b908c6..0b03cc69be 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -24,26 +24,26 @@ * Changes to this list possibly need an XLOG_PAGE_MAGIC bump. */ -/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint */ +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index b88b24f0c1..220f45b0ae 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -356,11 +356,13 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); + void (*rm_checkpoint) (int flags); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; extern void RmgrStartup(void); extern void RmgrCleanup(void); +extern void RmgrCheckpoint(int flags); extern void RmgrNotFound(RmgrId rmid); extern void RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr); [text/x-patch] pgss_001.v1.patch (17.7K, 4-pgss_001.v1.patch) download | inline diff: diff --git a/contrib/pg_stat_statements/expected/wal.out b/contrib/pg_stat_statements/expected/wal.out index 34a2bf5b03..4b2220a96b 100644 --- a/contrib/pg_stat_statements/expected/wal.out +++ b/contrib/pg_stat_statements/expected/wal.out @@ -17,7 +17,7 @@ FROM pg_stat_statements ORDER BY query COLLATE "C"; --------------------------------------------------------------+-------+------+---------------------+-----------------------+--------------------- DELETE FROM pgss_wal_tab WHERE a > $1 | 1 | 1 | t | t | t INSERT INTO pgss_wal_tab VALUES(generate_series($1, $2), $3) | 1 | 10 | t | t | t - SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | f | f | f + SELECT pg_stat_statements_reset() IS NOT NULL AS t | 1 | 1 | t | t | t SET pg_stat_statements.track_utility = FALSE | 1 | 0 | f | f | t UPDATE pgss_wal_tab SET b = $1 WHERE a > $2 | 1 | 3 | t | t | t (5 rows) diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 67cec865ba..d0220fd9eb 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -74,6 +74,10 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" + PG_MODULE_MAGIC; /* Location of permanent stats file (valid when database is shut down) */ @@ -323,7 +327,6 @@ PG_FUNCTION_INFO_V1(pg_stat_statements_info); static void pgss_shmem_request(void); static void pgss_shmem_startup(void); -static void pgss_shmem_shutdown(int code, Datum arg); static void pgss_post_parse_analyze(ParseState *pstate, Query *query, JumbleState *jstate); static PlannedStmt *pgss_planner(Query *parse, @@ -370,6 +373,50 @@ static void fill_in_constant_lengths(JumbleState *jstate, const char *query, int query_loc); static int comp_location(const void *a, const void *b); +/* RMGR API */ +#define CUSTOMRMGR_ID RM_EXPERIMENTAL_ID +#define CUSTOMRMGR_NAME "pgss_rmgr" + +static void rmgr_redo(XLogReaderState *record); +static void rmgr_desc(StringInfo buf, XLogReaderState *record); +static const char *rmgr_identify(uint8 info); +static void rmgr_checkpoint(int flags); + +/* WAL record definitions */ +#define PGSS_XLOG_INSERT 0x00 +#define PGSS_XLOG_RESET 0x10 + +/* The necessary fields from pgssEntry */ +typedef struct pgssXLogInsert +{ + uint32 header; + pgssHashKey key; + Counters counters; + int encoding; + TimestampTz stats_since; + TimestampTz minmax_stats_since; + int query_len; + char qtext[FLEXIBLE_ARRAY_MEMBER]; +} pgssXLogInsert; + +/* The params of entry_reset() function */ +typedef struct pgssXLogReset +{ + uint32 header; + Oid userid; + uint64 queryid; + Oid dbid; + bool minmax_only; +} pgssXLogReset; + +/* RMGR data */ +const RmgrData pgss_rmgr = { + .rm_name = CUSTOMRMGR_NAME, + .rm_redo = rmgr_redo, + .rm_checkpoint = rmgr_checkpoint, + .rm_identify = rmgr_identify, + .rm_desc = rmgr_desc +}; /* * Module load callback @@ -457,6 +504,8 @@ _PG_init(void) MarkGUCPrefixReserved("pg_stat_statements"); + RegisterCustomRmgr(CUSTOMRMGR_ID, &pgss_rmgr); + /* * Install hooks. */ @@ -556,9 +605,12 @@ pgss_shmem_startup(void) /* * If we're in the postmaster (or a standalone backend...), set up a shmem * exit hook to dump the statistics to disk. + * + * Now we do it at CHECKPOINT. + * + *if (!IsUnderPostmaster) + * on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); */ - if (!IsUnderPostmaster) - on_shmem_exit(pgss_shmem_shutdown, (Datum) 0); /* * Done if some other process already completed our initialization. @@ -720,108 +772,6 @@ fail: */ } -/* - * shmem_shutdown hook: Dump statistics into file. - * - * Note: we don't bother with acquiring lock, because there should be no - * other processes running when this is called. - */ -static void -pgss_shmem_shutdown(int code, Datum arg) -{ - FILE *file; - char *qbuffer = NULL; - Size qbuffer_size = 0; - HASH_SEQ_STATUS hash_seq; - int32 num_entries; - pgssEntry *entry; - - /* Don't try to dump during a crash. */ - if (code) - return; - - /* Safety check ... shouldn't get here unless shmem is set up. */ - if (!pgss || !pgss_hash) - return; - - /* Don't dump if told not to. */ - if (!pgss_save) - return; - - file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); - if (file == NULL) - goto error; - - if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) - goto error; - if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) - goto error; - num_entries = hash_get_num_entries(pgss_hash); - if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) - goto error; - - qbuffer = qtext_load_file(&qbuffer_size); - if (qbuffer == NULL) - goto error; - - /* - * When serializing to disk, we store query texts immediately after their - * entry data. Any orphaned query texts are thereby excluded. - */ - hash_seq_init(&hash_seq, pgss_hash); - while ((entry = hash_seq_search(&hash_seq)) != NULL) - { - int len = entry->query_len; - char *qstr = qtext_fetch(entry->query_offset, len, - qbuffer, qbuffer_size); - - if (qstr == NULL) - continue; /* Ignore any entries with bogus texts */ - - if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || - fwrite(qstr, 1, len + 1, file) != len + 1) - { - /* note: we assume hash_seq_term won't change errno */ - hash_seq_term(&hash_seq); - goto error; - } - } - - /* Dump global statistics for pg_stat_statements */ - if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) - goto error; - - free(qbuffer); - qbuffer = NULL; - - if (FreeFile(file)) - { - file = NULL; - goto error; - } - - /* - * Rename file into place, so we atomically replace any old one. - */ - (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); - - /* Unlink query-texts file; it's not needed while shutdown */ - unlink(PGSS_TEXT_FILE); - - return; - -error: - ereport(LOG, - (errcode_for_file_access(), - errmsg("could not write file \"%s\": %m", - PGSS_DUMP_FILE ".tmp"))); - free(qbuffer); - if (file) - FreeFile(file); - unlink(PGSS_DUMP_FILE ".tmp"); - unlink(PGSS_TEXT_FILE); -} - /* * Post-parse-analysis hook: mark query with a queryId */ @@ -1284,6 +1234,7 @@ pgss_store(const char *query, uint64 queryId, pgssEntry *entry; char *norm_query = NULL; int encoding = GetDatabaseEncoding(); + bool qtext_stored = false; Assert(query != NULL); @@ -1325,7 +1276,6 @@ pgss_store(const char *query, uint64 queryId, { Size query_offset; int gc_count; - bool stored; bool do_gc; /* @@ -1345,7 +1295,7 @@ pgss_store(const char *query, uint64 queryId, } /* Append new query text to file with only shared lock held */ - stored = qtext_store(norm_query ? norm_query : query, query_len, + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, &gc_count); /* @@ -1366,12 +1316,12 @@ pgss_store(const char *query, uint64 queryId, * This should be infrequent enough that doing it while holding * exclusive lock isn't a performance problem. */ - if (!stored || pgss->gc_count != gc_count) - stored = qtext_store(norm_query ? norm_query : query, query_len, + if (!qtext_stored || pgss->gc_count != gc_count) + qtext_stored = qtext_store(norm_query ? norm_query : query, query_len, &query_offset, NULL); /* If we failed to write to the text file, give up */ - if (!stored) + if (!qtext_stored) goto done; /* OK to create a new hashtable entry */ @@ -1486,6 +1436,38 @@ pgss_store(const char *query, uint64 queryId, SpinLockRelease(&e->mutex); } + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogInsert *xlog_entry; + XLogRecPtr ptr; + + xlog_entry = palloc(sizeof(pgssXLogInsert)); + xlog_entry->header = PGSS_FILE_HEADER; + xlog_entry->key = entry->key; + xlog_entry->counters = entry->counters; + xlog_entry->encoding = entry->encoding; + xlog_entry->stats_since = entry->stats_since; + xlog_entry->minmax_stats_since = entry->minmax_stats_since; + + XLogBeginInsert(); + XLogRegisterData((char *) xlog_entry, offsetof(pgssXLogInsert, qtext)); + + /* Write the query text if need */ + if (qtext_stored) + { + xlog_entry->query_len = entry->query_len; + XLogRegisterData(norm_query ? norm_query : (char *) query, query_len); + } + else + xlog_entry->query_len = 0; + + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_INSERT); + XLogFlush(ptr); + pfree(xlog_entry); + } + done: LWLockRelease(pgss->lock); @@ -2676,6 +2658,27 @@ entry_reset(Oid userid, Oid dbid, uint64 queryid, bool minmax_only) LWLockAcquire(pgss->lock, LW_EXCLUSIVE); num_entries = hash_get_num_entries(pgss_hash); + /* Write entry to XLOG */ + if (pgss_save && !RecoveryInProgress()) + { + pgssXLogReset *xlrec; + XLogRecPtr ptr; + + xlrec = palloc0(sizeof(pgssXLogReset)); + xlrec->header = PGSS_FILE_HEADER; + xlrec->userid = userid; + xlrec->dbid = dbid; + xlrec->queryid = queryid; + xlrec->minmax_only = minmax_only; + + XLogBeginInsert(); + XLogRegisterData((char *) xlrec, sizeof(pgssXLogReset)); + XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT); + ptr = XLogInsert(CUSTOMRMGR_ID, PGSS_XLOG_RESET); + XLogFlush(ptr); + pfree(xlrec); + } + stats_reset = GetCurrentTimestamp(); if (userid != 0 && dbid != 0 && queryid != UINT64CONST(0)) @@ -3010,3 +3013,236 @@ comp_location(const void *a, const void *b) return pg_cmp_s32(l, r); } + +static void +rmgr_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + /* + * Because we did not restore records from storage, + * we also do not restore records from WAL. + */ + if (!pgss_save) + return; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) XLogRecGetData(record); + pgssEntry *entry; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + /* Safety check... */ + if (!pgss || !pgss_hash) + return; + + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + entry = (pgssEntry *) hash_search(pgss_hash, &xlrec->key, HASH_FIND, NULL); + + /* Create new entry, if not present */ + if (!entry) + { + Size query_offset; + bool stored; + char *query; + + Assert(xlrec->query_len > 0); + + query = (char *) xlrec->qtext; + + /* Append new query text to file */ + stored = qtext_store(query, xlrec->query_len, &query_offset, NULL); + + /* If we failed to write to the text file, give up */ + if (!stored) + { + LWLockRelease(pgss->lock); + return; + } + + /* OK to create a new hashtable entry */ + entry = entry_alloc(&xlrec->key, query_offset, xlrec->query_len, + xlrec->encoding, false); + + /* If needed, perform garbage collection */ + gc_qtexts(); + } + + /* Copy the necessary data from XLog record */ + entry->counters = xlrec->counters; + + entry->encoding = xlrec->encoding; + entry->stats_since = xlrec->stats_since; + entry->minmax_stats_since = xlrec->minmax_stats_since; + + LWLockRelease(pgss->lock); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) XLogRecGetData(record); + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + entry_reset(xlrec->userid, xlrec->dbid, xlrec->queryid, xlrec->minmax_only); + } +} + +static void +rmgr_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == PGSS_XLOG_INSERT) + { + pgssXLogInsert *xlrec = (pgssXLogInsert *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", toplevel: %d", + xlrec->key.userid, xlrec->key.dbid, xlrec->key.queryid, + (int) xlrec->key.toplevel); + } + else if (info == PGSS_XLOG_RESET) + { + pgssXLogReset *xlrec = (pgssXLogReset *) rec; + + if (xlrec->header != PGSS_FILE_HEADER) + { + elog(WARNING, "Skip the inconsistent WAL record"); + return; + } + + appendStringInfo(buf, "userid: %u, dbid: %u, queryid: " UINT64_FORMAT + ", minmax_only: %d", + xlrec->userid, xlrec->dbid, xlrec->queryid, + (int) xlrec->minmax_only); + } +} + +static const char * +rmgr_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_INSERT) + return "INSERT"; + if ((info & ~XLR_INFO_MASK) == PGSS_XLOG_RESET) + return "RESET"; + + return NULL; +} + +static void +rmgr_checkpoint(int flags) +{ + FILE *file; + char *qbuffer = NULL; + Size qbuffer_size = 0; + HASH_SEQ_STATUS hash_seq; + int32 num_entries; + pgssEntry *entry; + + /* Safety check ... shouldn't get here unless shmem is set up. */ + if (!pgss || !pgss_hash) + return; + + /* Don't dump if told not to. */ + if (!pgss_save) + return; + + /* XXX: Can there be concurrent CHECKPOINTs? */ + LWLockAcquire(pgss->lock, LW_EXCLUSIVE); + + file = AllocateFile(PGSS_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + goto error; + + if (fwrite(&PGSS_FILE_HEADER, sizeof(uint32), 1, file) != 1) + goto error; + if (fwrite(&PGSS_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1) + goto error; + num_entries = hash_get_num_entries(pgss_hash); + if (fwrite(&num_entries, sizeof(int32), 1, file) != 1) + goto error; + + qbuffer = qtext_load_file(&qbuffer_size); + if (qbuffer == NULL) + goto error; + + /* + * When serializing to disk, we store query texts immediately after their + * entry data. Any orphaned query texts are thereby excluded. + */ + hash_seq_init(&hash_seq, pgss_hash); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + int len = entry->query_len; + char *qstr = qtext_fetch(entry->query_offset, len, + qbuffer, qbuffer_size); + + if (qstr == NULL) + continue; /* Ignore any entries with bogus texts */ + + if (fwrite(entry, sizeof(pgssEntry), 1, file) != 1 || + fwrite(qstr, 1, len + 1, file) != len + 1) + { + /* note: we assume hash_seq_term won't change errno */ + hash_seq_term(&hash_seq); + goto error; + } + } + + /* Dump global statistics for pg_stat_statements */ + if (fwrite(&pgss->stats, sizeof(pgssGlobalStats), 1, file) != 1) + goto error; + + free(qbuffer); + qbuffer = NULL; + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file into place, so we atomically replace any old one. + */ + (void) durable_rename(PGSS_DUMP_FILE ".tmp", PGSS_DUMP_FILE, LOG); + + /* Unlink query-texts file; it's not needed while shutdown */ + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + PGSS_DUMP_FILE ".tmp"))); + free(qbuffer); + if (file) + FreeFile(file); + unlink(PGSS_DUMP_FILE ".tmp"); + + if (flags & CHECKPOINT_IS_SHUTDOWN) + unlink(PGSS_TEXT_FILE); + + LWLockRelease(pgss->lock); +} diff --git a/contrib/pg_stat_statements/t/020_crash.pl b/contrib/pg_stat_statements/t/020_crash.pl new file mode 100644 index 0000000000..f33bb43d7d --- /dev/null +++ b/contrib/pg_stat_statements/t/020_crash.pl @@ -0,0 +1,80 @@ +# Copyright (c) 2023-2024, PostgreSQL Global Development Group + +# Tests for checking that pg_stat_statements contents are preserved +# across restarts. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('main'); +$node->init; +$node->append_conf('postgresql.conf', + q[ + shared_preload_libraries = 'pg_stat_statements' + restart_after_crash = 1 + ]); +$node->start; + +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_statements'); + +# Without the CHECKPOINT hook, we won't see this query in pg_stat_statements +# after a server crash. +$node->safe_psql('postgres', 'CREATE TABLE t1 (a int)'); + +$node->safe_psql('postgres', 'CHECKPOINT'); +$node->safe_psql('postgres', 'SELECT a FROM t1'); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1", + 'pg_stat_statements populated'); + + +# Perform a server shutdown by killing the backend. +my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); + +my ($killme_stdin, $killme_stdout, $killme_stderr) = ('', '', ''); +my $killme = IPC::Run::start( + [ + 'psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d', + $node->connstr('postgres') + ], + '<', + \$killme_stdin, + '>', + \$killme_stdout, + '2>', + \$killme_stderr, + $psql_timeout); + +$killme_stdin .= "SELECT pg_backend_pid();\n"; +ok( pump_until( + $killme, $psql_timeout, \$killme_stdout, qr/[[:digit:]]+[\r\n]$/m), + 'acquired pid for SIGQUIT'); +my $pid = $killme_stdout; +chomp($pid); + +my $ret = PostgreSQL::Test::Utils::system_log('pg_ctl', 'kill', 'QUIT', $pid); +is($ret, 0, "killed process with SIGQUIT"); + +$killme->finish; + +# Wait till server restarts +is($node->poll_query_until('postgres', undef, ''), + "1", "reconnected after SIGQUIT"); + +is( $node->safe_psql( + 'postgres', + "SELECT query FROM pg_stat_statements WHERE query NOT LIKE '%pg_stat_statements%' ORDER BY query" + ), + "CHECKPOINT\nCREATE TABLE t1 (a int)\nSELECT a FROM t1\nSELECT pg_backend_pid()", + 'pg_stat_statements data kept across the server crash'); + +$node->stop; + +done_testing(); ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-03-21 19:02 Jeff Davis <[email protected]> parent: Danil Anisimow <[email protected]> 1 sibling, 1 reply; 12+ messages in thread From: Jeff Davis @ 2024-03-21 19:02 UTC (permalink / raw) To: Danil Anisimow <[email protected]>; +Cc: Andres Freund <[email protected]>; pgsql-hackers On Thu, 2024-03-21 at 19:47 +0700, Danil Anisimow wrote: > [pgss_001.v1.patch] adds a custom resource manager to the > pg_stat_statements extension. Did you consider moving the logic for loading the initial contents from disk from pgss_shmem_startup to .rmgr_startup? > The rm_checkpoint hook allows saving shared memory data to disk at > each checkpoint. However, for pg_stat_statements, it matters when the > checkpoint occurred. When the server shuts down, pgss deletes the > temporary file of query texts. In other cases, this is unacceptable. > To provide this capability, a flags parameter was added to the > rm_checkpoint hook. The changes are presented in [rmgr_003.v2.patch]. Overall this seems fairly reasonable to me. I think this will work for similar extensions, where the data being stored is independent from the buffers. My biggest concern is that it might not be quite right for a table AM that has complex state that needs action to be taken at a slightly different time, e.g. right after CheckPointBuffers(). Then again, the rmgr is a low-level API, and any extension using it should be prepared to adapt to changes. If it works for pgss, then we know it works for at least one thing, and we can always improve it later. For instance, we might call the hook several times and pass it a "phase" argument. Regards, Jeff Davis ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-03-21 19:07 Jeff Davis <[email protected]> parent: Danil Anisimow <[email protected]> 1 sibling, 0 replies; 12+ messages in thread From: Jeff Davis @ 2024-03-21 19:07 UTC (permalink / raw) To: Danil Anisimow <[email protected]>; +Cc: Andres Freund <[email protected]>; pgsql-hackers On Thu, 2024-03-21 at 19:47 +0700, Danil Anisimow wrote: > The proposed patch is not a complete solution for pgss and may not > work correctly with replication. Also, what is the desired behavior during replication? Should queries on the primary be represented in pgss on the replica? If the answer is yes, should they be differentiated somehow so that you can know where the slow queries are running? Regards, Jeff Davis ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-03-29 11:20 Danil Anisimow <[email protected]> parent: Jeff Davis <[email protected]> 0 siblings, 1 reply; 12+ messages in thread From: Danil Anisimow @ 2024-03-29 11:20 UTC (permalink / raw) To: Jeff Davis <[email protected]>; +Cc: Andres Freund <[email protected]>; pgsql-hackers On Fri, Mar 22, 2024 at 2:02 AM Jeff Davis <[email protected]> wrote: > On Thu, 2024-03-21 at 19:47 +0700, Danil Anisimow wrote: > > [pgss_001.v1.patch] adds a custom resource manager to the > > pg_stat_statements extension. > > Did you consider moving the logic for loading the initial contents from > disk from pgss_shmem_startup to .rmgr_startup? I tried it, but .rmgr_startup is not called if the system was shut down cleanly. > My biggest concern is that it might not be quite right for a table AM > that has complex state that needs action to be taken at a slightly > different time, e.g. right after CheckPointBuffers(). > Then again, the rmgr is a low-level API, and any extension using it > should be prepared to adapt to changes. If it works for pgss, then we > know it works for at least one thing, and we can always improve it > later. For instance, we might call the hook several times and pass it a > "phase" argument. In [rmgr_003.v3.patch] I added a phase argument to RmgrCheckpoint(). Currently it is only called in two places: before and after CheckPointBuffers(). -- Regards, Daniil Anisimov Postgres Professional: http://postgrespro.com Attachments: [text/x-patch] rmgr_003.v3.patch (9.9K, 3-rmgr_003.v3.patch) download | inline diff: diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 3e2f1d4a23..5a1fbe8379 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -44,8 +44,8 @@ /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ - { name, redo, desc, identify, startup, cleanup, mask, decode }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ + { name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint }, RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" @@ -83,6 +83,25 @@ RmgrCleanup(void) } } +/* + * Checkpoint all resource managers. + * + * See CreateCheckPoint for details about flags. + * phase shows a position in which RmgrCheckpoint is called in CheckPointGuts. + */ +void +RmgrCheckpoint(int flags, RmgrCheckpointPhase phase) +{ + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + + if (RmgrTable[rmid].rm_checkpoint != NULL) + RmgrTable[rmid].rm_checkpoint(flags, phase); + } +} + /* * Emit ERROR when we encounter a record with an RmgrId we don't * recognize. diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 20a5f86209..d7ecab6769 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7357,8 +7357,13 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointSUBTRANS(); CheckPointMultiXact(); CheckPointPredicate(); + + RmgrCheckpoint(flags, RMGR_CHECKPOINT_BEFORE_BUFFERS); + CheckPointBuffers(flags); + RmgrCheckpoint(flags, RMGR_CHECKPOINT_AFTER_BUFFERS); + /* Perform all queued up fsyncs */ TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START(); CheckpointStats.ckpt_sync_t = GetCurrentTimestamp(); diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 22f7351fdc..11ae1e7af4 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -28,7 +28,7 @@ * RmgrNames is an array of the built-in resource manager names, to make error * messages a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ name, static const char *const RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 6b8c17bb4c..2bb5ba8c9f 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -32,7 +32,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ { name, desc, identify}, static const RmgrDescData RmgrDescTable[RM_N_BUILTIN_IDS] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index 3b6a497e1b..34ddc0210c 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode,checkpoint) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 78e6b908c6..0b03cc69be 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -24,26 +24,26 @@ * Changes to this list possibly need an XLOG_PAGE_MAGIC bump. */ -/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +/* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode, checkpoint */ +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, standby_decode, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, heap2_decode, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, heap_decode, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog_startup, btree_xlog_cleanup, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode, NULL) diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index b88b24f0c1..52a70e65e1 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -325,6 +325,15 @@ typedef enum RECOVERY_TARGET_ACTION_SHUTDOWN, } RecoveryTargetAction; +/* Checkpoint phases in which RmgrCheckpoint() is called. */ +typedef enum RmgrCheckpointPhase +{ + RMGR_CHECKPOINT_BEFORE_BUFFERS = 0, /* RmgrCheckpoint() is called + * before CheckPointBuffers() */ + RMGR_CHECKPOINT_AFTER_BUFFERS, /* RmgrCheckpoint() is called + * after CheckPointBuffers() */ +} RmgrCheckpointPhase; + struct LogicalDecodingContext; struct XLogRecordBuffer; @@ -356,11 +365,13 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); + void (*rm_checkpoint) (int flags, RmgrCheckpointPhase phase); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; extern void RmgrStartup(void); extern void RmgrCleanup(void); +extern void RmgrCheckpoint(int flags, RmgrCheckpointPhase phase); extern void RmgrNotFound(RmgrId rmid); extern void RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr); ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-03-29 17:09 Jeff Davis <[email protected]> parent: Danil Anisimow <[email protected]> 0 siblings, 1 reply; 12+ messages in thread From: Jeff Davis @ 2024-03-29 17:09 UTC (permalink / raw) To: Danil Anisimow <[email protected]>; +Cc: Andres Freund <[email protected]>; pgsql-hackers; Robert Haas <[email protected]> On Fri, 2024-03-29 at 18:20 +0700, Danil Anisimow wrote: > > In [rmgr_003.v3.patch] I added a phase argument to RmgrCheckpoint(). > Currently it is only called in two places: before and after > CheckPointBuffers(). I am fine with this. You've moved the discussion forward in two ways: 1. Changes to pg_stat_statements to actually use the API; and 2. The hook is called at multiple points. Those at least partially address the concerns raised by Andres and Robert. But given that there was pushback from multiple people on the feature, I'd like to hear from at least one of them. It's very late in the cycle so I'm not sure we'll get more feedback in time, though. Regards, Jeff Davis ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-05-17 18:56 Robert Haas <[email protected]> parent: Jeff Davis <[email protected]> 0 siblings, 1 reply; 12+ messages in thread From: Robert Haas @ 2024-05-17 18:56 UTC (permalink / raw) To: Jeff Davis <[email protected]>; +Cc: Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers On Fri, Mar 29, 2024 at 1:09 PM Jeff Davis <[email protected]> wrote: > I am fine with this. > > You've moved the discussion forward in two ways: > > 1. Changes to pg_stat_statements to actually use the API; and > 2. The hook is called at multiple points. > > Those at least partially address the concerns raised by Andres and > Robert. But given that there was pushback from multiple people on the > feature, I'd like to hear from at least one of them. It's very late in > the cycle so I'm not sure we'll get more feedback in time, though. In my seemingly-neverending pass through the July CommitFest, I reached this patch. My comment is: it's possible that rmgr_003.v3.patch is enough to be useful, but does anyone in the world think they know that for a fact? I mean, pgss_001.v1.patch purports to demonstrate that it can be used, but that's based on rmgr_003.v2.patch, not the v3 patch, and the emails seem to indicate that it may not actually work. I also think, looking at it, that it looks much more like a POC than something we'd consider ready for commit. It also seems very unclear that we'd want pg_stat_statements to behave this way, and indeed "this way" isn't really spelled out anywhere. I think it would be nice if we had an example that uses the proposed hook that we could actually commit. Maybe that's asking too much, though. I think the minimum thing we need is a compelling rationale for why this particular hook design is going to be good enough. That could be demonstrated by means of (1) a well-commented example that accomplishes some understandable goal and/or (2) a detailed description of how a non-core table AM or index AM is expected to be able to make use of this. Bonus points if the person providing that rationale can say credibly that they've actually implemented this and it works great with 100TB of production data. The problem here is not only that we don't want to commit a hook that does nothing useful. We also don't want to commit a hook that works wonderfully for someone but we have no idea why. If we do that, then we don't know whether it's OK to modify the hook in the future as the code evolves, or more to the point, which kinds of modifications will be acceptable. And also, the next person who wants to use it is likely to have to figure out all on their own how to do so, instead of being able to refer to comments or documentation or the commit message or at least a mailing list post. My basic position is not that this patch is a bad idea, but that it isn't really finished. The idea is probably a pretty good one, but whether this is a reasonable implementation of the idea doesn't seem clear, at least not to me. -- Robert Haas EDB: http://www.enterprisedb.com ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-05-17 20:20 Jeff Davis <[email protected]> parent: Robert Haas <[email protected]> 0 siblings, 2 replies; 12+ messages in thread From: Jeff Davis @ 2024-05-17 20:20 UTC (permalink / raw) To: Robert Haas <[email protected]>; +Cc: Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers On Fri, 2024-05-17 at 14:56 -0400, Robert Haas wrote: > (2) a detailed > description of how a non-core table AM or index AM is expected to be > able to make use of this. Bonus points if the person providing that > rationale can say credibly that they've actually implemented this and > it works great with 100TB of production data. That's a chicken-and-egg problem and we should be careful about setting the bar too high for table AM improvements. Ultimately, AM authors will benefit more from a steady stream of improvements that sometimes miss the mark than complete stagnation, as long as we use reasonable judgement. There aren't a lot of table AMs, and to create a good one you need a lot of internals knowledge. If it's an important AM, the developers are surely going to try it out on mainline occasionally, and expect API breaks. If the API breaks for them in some fundamental way, they can complain and we still have time to fix it. > The problem here is not only that we don't want to commit a hook that > does nothing useful. We also don't want to commit a hook that works > wonderfully for someone but we have no idea why. If we do that, then > we don't know whether it's OK to modify the hook in the future as the > code evolves, or more to the point, which kinds of modifications will > be acceptable. We have to have some kind of understanding between us and AM authors that they need to participate in discussions when using these APIs, try changes during development, be adaptable when they change from release to release, and come back and tell us when something is wrong. > And also, the next person who wants to use it is likely > to have to figure out all on their own how to do so, instead of being > able to refer to comments or documentation or the commit message or > at > least a mailing list post. Obviously it would be better to have a nice example table AM in /contrib, different enough from heap, but nobody has done that yet. You could argue that we never should have exposed the API without something like this in the first place, but that's also a big ask and we'd probably still not have it. Regarding this particular change: the checkpointing hook seems more like a table AM feature, so I agree with you that we should have a good idea how a real table AM might use this, rather than only pg_stat_statements. Regards, Jeff Davis ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-05-17 20:25 Robert Haas <[email protected]> parent: Jeff Davis <[email protected]> 1 sibling, 1 reply; 12+ messages in thread From: Robert Haas @ 2024-05-17 20:25 UTC (permalink / raw) To: Jeff Davis <[email protected]>; +Cc: Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers On Fri, May 17, 2024 at 4:20 PM Jeff Davis <[email protected]> wrote: > Regarding this particular change: the checkpointing hook seems more > like a table AM feature, so I agree with you that we should have a good > idea how a real table AM might use this, rather than only > pg_stat_statements. I would even be OK with a pg_stat_statements example that is fully working and fully explained. I just don't want to have no example at all. The original proposal has been changed twice because of complaints that the hook wasn't quite useful enough, but I think that only proves that v3 is closer to being useful than v1. If v1 is 40% of the way to useful and v3 is 120% of the way to useful, wonderful! But if v1 is 20% of the way to being useful and v3 is 60% of the way to being useful, it's not time to commit anything yet. I don't know which is the case, and I think if someone wants this to be committed, they need to explain clearly why it's the first and not the second. -- Robert Haas EDB: http://www.enterprisedb.com ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-05-27 18:20 Michael Paquier <[email protected]> parent: Robert Haas <[email protected]> 0 siblings, 1 reply; 12+ messages in thread From: Michael Paquier @ 2024-05-27 18:20 UTC (permalink / raw) To: Robert Haas <[email protected]>; +Cc: Jeff Davis <[email protected]>; Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers On Fri, May 17, 2024 at 04:25:15PM -0400, Robert Haas wrote: > On Fri, May 17, 2024 at 4:20 PM Jeff Davis <[email protected]> wrote: >> Regarding this particular change: the checkpointing hook seems more >> like a table AM feature, so I agree with you that we should have a good >> idea how a real table AM might use this, rather than only >> pg_stat_statements. > > I would even be OK with a pg_stat_statements example that is fully > working and fully explained. I just don't want to have no example at > all. The original proposal has been changed twice because of > complaints that the hook wasn't quite useful enough, but I think that > only proves that v3 is closer to being useful than v1. If v1 is 40% of > the way to useful and v3 is 120% of the way to useful, wonderful! But > if v1 is 20% of the way to being useful and v3 is 60% of the way to > being useful, it's not time to commit anything yet. I don't know which > is the case, and I think if someone wants this to be committed, they > need to explain clearly why it's the first and not the second. Please note that I've been studying ways to have pg_stat_statements being plugged in directly with the shared pgstat APIs to get it backed by a dshash to give more flexibility and scaling, giving a way for extensions to register their own stats kind. In this case, the flush of the stats would be controlled with a callback in the stats registered by the extensions, conflicting with what's proposed here. pg_stat_statements is all about stats, at the end. I don't want this argument to act as a barrier if a checkpoint hook is an accepted consensus here, but a checkpoint hook used for this code path is not the most intuitive solution I can think of in the long-term. -- Michael Attachments: [application/pgp-signature] signature.asc (833B, 2-signature.asc) download ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2024-05-27 21:32 Tristan Partin <[email protected]> parent: Jeff Davis <[email protected]> 1 sibling, 0 replies; 12+ messages in thread From: Tristan Partin @ 2024-05-27 21:32 UTC (permalink / raw) To: Jeff Davis <[email protected]>; Robert Haas <[email protected]>; +Cc: Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers On Fri May 17, 2024 at 3:20 PM CDT, Jeff Davis wrote: > ... > > Obviously it would be better to have a nice example table AM in > /contrib, different enough from heap, but nobody has done that yet. You > could argue that we never should have exposed the API without something > like this in the first place, but that's also a big ask and we'd > probably still not have it. Not sure how useful it would be as an example, but MariaDB has a blackhole storage engine[0], which has helped serve as a guide for me previously. [0]: https://mariadb.com/kb/en/blackhole/ -- Tristan Partin https://tristan.partin.io ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2025-10-14 09:11 Andrei Lepikhov <[email protected]> parent: Michael Paquier <[email protected]> 0 siblings, 1 reply; 12+ messages in thread From: Andrei Lepikhov @ 2025-10-14 09:11 UTC (permalink / raw) To: Michael Paquier <[email protected]>; +Cc: Jeff Davis <[email protected]>; Robert Haas <[email protected]>; Danil Anisimow <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers; Dean Rasheed <[email protected]>; Yurii Rashkovskii <[email protected]> On 27/5/2024 20:20, Michael Paquier wrote: > Please note that I've been studying ways to have pg_stat_statements > being plugged in directly with the shared pgstat APIs to get it backed > by a dshash to give more flexibility and scaling, giving a way for > extensions to register their own stats kind. In this case, the flush > of the stats would be controlled with a callback in the stats > registered by the extensions, conflicting with what's proposed here. > pg_stat_statements is all about stats, at the end. I don't want this > argument to act as a barrier if a checkpoint hook is an accepted > consensus here, but a checkpoint hook used for this code path is not > the most intuitive solution I can think of in the long-term.Let me continue this thread. I wait for any kind of checkpoint cut-in machinery for extensions. Typically, when collecting knowledge about the instance state, we store it in an extension's owned database table, incurring the costs associated with transactional mechanics, tuple format overhead, and so on. Usually, we don't need MVCC or rollback; we have fixed-length data, and it would be better to store data in hash tables. These hash tables should survive instances' restarts and crashes - that's the only feature needed. The pg_stat_statements dumps its data to a file, but it is not reliable enough when we need consistent information, such as replication status or when logging update conflicts (see the Spock extension [1]). When we learn about query executions, we can't dump the hash table on each ExecutorEnd due to overhead, but we are okay with adding one more WAL record containing the hash table entry data - it may be done by the backend or by a separate background worker. So, the primary reason for us is to have a moment to store the extension's state on disk, keeping in mind that we have registered RMGR, which allows us to restore the full state using this disk file and WAL records. For me, the ideal place for such a hook is CheckPointGuts, right between the CheckPointBuffers call and fsyncs. I think that to demonstrate how this hook can work, the pg_stat_statements storage may need to be redesigned slightly. [1] https://github.com/pgEdge/spock -- regards, Andrei Lepikhov, pgEdge ^ permalink raw reply [nested|flat] 12+ messages in thread
* Re: Comments on Custom RMGRs @ 2025-11-15 10:44 Andrei Lepikhov <[email protected]> parent: Andrei Lepikhov <[email protected]> 0 siblings, 0 replies; 12+ messages in thread From: Andrei Lepikhov @ 2025-11-15 10:44 UTC (permalink / raw) To: Michael Paquier <[email protected]>; Danil Anisimow <[email protected]>; +Cc: Jeff Davis <[email protected]>; Robert Haas <[email protected]>; Andres Freund <[email protected]>; pgsql-hackers; Dean Rasheed <[email protected]>; Yurii Rashkovskii <[email protected]> On 14/10/2025 11:11, Andrei Lepikhov wrote: > For me, the ideal place for such a hook is CheckPointGuts, right between > the CheckPointBuffers call and fsyncs. I think that to demonstrate how > this hook can work, the pg_stat_statements storage may need to be > redesigned slightly. There are two patches: 0001, which is the checkpoint hook itself, and 0002, which includes an example and a trivial test. During development, I attempted to apply it in my different modules and realised that the hook is preferred over an RMGR callback - I don't actually want to be forced to register RMGR in each project and have it loadable on an instance startup. In lightweight modules, I want to keep my knowledge base relatively close to the current state of the instance. Nevertheless, the plan freezing extension (for example) needs to ensure that the user's query plan is 'frozen' after the function call. Therefore, we need to store the decision made in the WAL, which requires dumping the state into a file before performing the WAL cut. Additionally, I'd like to experiment with synchronising an extension state between master and replica through WAL records, as most optimisation recommendations are relevant to both instances. Patch 0001 contains a hook that is called once after all checkpoint preparations have finished. I recall that people mentioned it might be helpful for AMs as well - feel free to propose changes to this patch. Patch 2 adds an example to the test_dsm_registry module, as it is precisely the way I write the code: named DSM segment -> shared HTAB -> file dump. So, it looks natural and opens a room to extend this example by employing RMGR and xact callbacks to keep the extension state as close to the committed changes as possible. The test looks pretty trivial so far - feel free to propose ideas on how to extend it. -- regards, Andrei Lepikhov, pgEdge From a0e8d75223fa95dbec1e422eacaef336e45c2008 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" <[email protected]> Date: Thu, 13 Nov 2025 15:00:43 +0100 Subject: [PATCH 1/2] Add a hook for Checkpoint processing. There are many situations in which a Postgres plugin may need to maintain its internal state across restarts or crashes. Sometimes it wants to synchronise its state on logical replicas or be saved in a backup employing custom RMGR and WAL records. For statistical extensions, such as pg_stat_statements, it is okay to save their state on postmaster shutdown. However, business extensions may want to maintain more actual state, periodically dumping it to a disk file or using WAL and xact callbacks to be as close as possible to the current database state. Checkpoint is a key moment where the DBMS performs disk synchronisation and cuts the WAL. It is a good time to do the same thing for a plugin, too. Moreover, the plugin is sure that nothing important will be lost with the WAL cut. Discussion: https://www.postgresql.org/message-id/CANbhV-E4pTWeF-DsdaGsOrjJNFWPaR%2BDstjrnkqvf9JFFgOKKQ%40mail.g... --- src/backend/access/transam/xlog.c | 15 +++++++++++++++ src/include/access/xlog.h | 4 ++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 20 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 22d0a2e8c3a..c7c0b226724 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -157,6 +157,13 @@ int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; */ int CheckPointSegments; +/* + * Hook for plugins to take control during checkpoint processing. All + * preparation procedures have already been done, and only the sync needs + * to be done. + */ +Checkpoint_hook_type Checkpoint_hook = NULL; + /* Estimated distance between checkpoints, in bytes */ static double CheckPointDistanceEstimate = 0; static double PrevCheckPointDistance = 0; @@ -7594,6 +7601,14 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointPredicate(); CheckPointBuffers(flags); + /* + * Allow a plugin that depends on a custom RMGR to retain its state through + * reboots or crashes by following specific steps, ensuring that essential + * WAL records are not truncated. + */ + if (Checkpoint_hook) + Checkpoint_hook(checkPointRedo, flags); + /* Perform all queued up fsyncs */ TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START(); CheckpointStats.ckpt_sync_t = GetCurrentTimestamp(); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 605280ed8fb..5c071974557 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -198,6 +198,10 @@ typedef enum WALAvailability struct XLogRecData; struct XLogReaderState; +/* Hook for plugins to get control at the end of a CheckPoint */ +typedef void (*Checkpoint_hook_type)(XLogRecPtr checkPointRedo, int flags); +extern PGDLLIMPORT Checkpoint_hook_type Checkpoint_hook; + extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 23bce72ae64..6ca05499081 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -413,6 +413,7 @@ CatalogIndexState ChangeVarNodes_callback ChangeVarNodes_context CheckPoint +Checkpoint_hook_type CheckPointStmt CheckpointStatsData CheckpointerRequest -- 2.51.2 From f92abbcc3667103628608d248870867200087e16 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" <[email protected]> Date: Fri, 14 Nov 2025 16:35:21 +0100 Subject: [PATCH 2/2] Testing module --- src/test/modules/test_dsm_registry/Makefile | 1 + .../test_dsm_registry/t/001_file_storage.pl | 31 ++++ .../test_dsm_registry/test_dsm_registry.c | 163 ++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 src/test/modules/test_dsm_registry/t/001_file_storage.pl diff --git a/src/test/modules/test_dsm_registry/Makefile b/src/test/modules/test_dsm_registry/Makefile index b13e99a354f..9aae8b98aba 100644 --- a/src/test/modules/test_dsm_registry/Makefile +++ b/src/test/modules/test_dsm_registry/Makefile @@ -10,6 +10,7 @@ EXTENSION = test_dsm_registry DATA = test_dsm_registry--1.0.sql REGRESS = test_dsm_registry +TAP_TESTS = 1 ifdef USE_PGXS PG_CONFIG = pg_config diff --git a/src/test/modules/test_dsm_registry/t/001_file_storage.pl b/src/test/modules/test_dsm_registry/t/001_file_storage.pl new file mode 100644 index 00000000000..0e82d0adcf7 --- /dev/null +++ b/src/test/modules/test_dsm_registry/t/001_file_storage.pl @@ -0,0 +1,31 @@ +# Copyright (c) 2023-2025, PostgreSQL Global Development Group +use strict; +use warnings FATAL => 'all'; +use Config; +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); + +$node->init(); +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'test_dsm_registry'"); +$node->start(); + +$node->safe_psql('postgres', "CREATE EXTENSION test_dsm_registry"); + +my $result; + +$node->safe_psql('postgres', "SELECT set_val_in_hash('test-1', '1414')"); +$node->safe_psql('postgres', 'CHECKPOINT'); +$node->safe_psql('postgres', "SELECT set_val_in_hash('test-2', '1415')"); +$node->stop('immediate'); +$node->start(); + +$result = $node->safe_psql('postgres', "SELECT get_val_in_hash('test-1')"); +is($result, '1414', "Value inserted before the checkpoint was restored"); +$result = $node->safe_psql('postgres', "SELECT get_val_in_hash('test-2')"); +is($result, '', "Value inserted after the checkpoint was lost"); + +done_testing(); diff --git a/src/test/modules/test_dsm_registry/test_dsm_registry.c b/src/test/modules/test_dsm_registry/test_dsm_registry.c index 4cc2ccdac3f..2d7fd35a74d 100644 --- a/src/test/modules/test_dsm_registry/test_dsm_registry.c +++ b/src/test/modules/test_dsm_registry/test_dsm_registry.c @@ -12,13 +12,22 @@ */ #include "postgres.h" +#include "access/xlog.h" #include "fmgr.h" +#include "pgstat.h" #include "storage/dsm_registry.h" +#include "storage/fd.h" #include "storage/lwlock.h" #include "utils/builtins.h" +#include "utils/hsearch.h" PG_MODULE_MAGIC; +/* Location of permanent storage file (valid on checkpoint) */ +#define TDR_DUMP_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat" +/* Magic number identifying the stats file format */ +static const uint32 TDR_FILE_HEADER = 0x20251114; + typedef struct TestDSMRegistryStruct { int val; @@ -43,6 +52,11 @@ static const dshash_parameters dsh_params = { dshash_strcpy }; +static Checkpoint_hook_type prev_Checkpoint_hook = NULL; + +static void load_htab(void); +static void pgss_Checkpoint(XLogRecPtr checkPointRedo, int flags); + static void init_tdr_dsm(void *ptr) { @@ -66,7 +80,14 @@ tdr_attach_shmem(void) tdr_dsa = GetNamedDSA("test_dsm_registry_dsa", &found); if (tdr_hash == NULL) + { + LWLockAcquire(&tdr_dsm->lck, LW_EXCLUSIVE); tdr_hash = GetNamedDSHash("test_dsm_registry_hash", &dsh_params, &found); + if (!found) + load_htab(); + + LWLockRelease(&tdr_dsm->lck); + } } PG_FUNCTION_INFO_V1(set_val_in_shmem); @@ -144,3 +165,145 @@ get_val_in_hash(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(val); } + +/* + * Load any pre-existing entries from file. + */ +static void +load_htab(void) +{ + bool found; + FILE *file = NULL; + uint32 header; + char *val = palloc(1); + + Assert(tdr_dsa != NULL && tdr_hash != NULL); + + /* + * Attempt to load old entries from the dump file. + */ + file = AllocateFile(TDR_DUMP_FILE, PG_BINARY_R); + if (file == NULL) + { + if (errno != ENOENT) + goto read_error; + /* No existing persisted file, so we're done */ + return; + } + + if (fread(&header, sizeof(uint32), 1, file) != 1 || + header != TDR_FILE_HEADER) + goto read_error; + + while (!feof(file)) + { + TestDSMRegistryHashEntry *entry; + char key[64]; + int keylen = offsetof(TestDSMRegistryHashEntry, val); + int32 vlen; + + if (fread(key, keylen, 1, file) != 1 || + fread(&vlen, sizeof(int32), 1, file) != 1) + goto read_error; + + val = repalloc(val, vlen); + if (fread(val, vlen, 1, file) != 1) + goto read_error; + + Assert(val[vlen - 1] == '\0'); + + entry = (TestDSMRegistryHashEntry *) + dshash_find_or_insert(tdr_hash, key, &found); + Assert(!found); + + entry->val = dsa_allocate(tdr_dsa, strlen(val) + 1); + strcpy(dsa_get_address(tdr_dsa, entry->val), val); + + dshash_release_lock(tdr_hash, entry); + } + + FreeFile(file); + return; + +read_error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read from file \"%s\": %m", TDR_DUMP_FILE))); + if (file) + FreeFile(file); + /* If possible, throw away the bogus file; ignore any error */ + unlink(TDR_DUMP_FILE); +} + +/* + * Dump hash table into file. + * + */ +static void +pgss_Checkpoint(XLogRecPtr checkPointRedo, int flags) +{ + FILE *file; + dshash_seq_status hstat; + TestDSMRegistryHashEntry *entry; + + if (flags & CHECKPOINT_END_OF_RECOVERY) + return; + + tdr_attach_shmem(); + + file = AllocateFile(TDR_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + goto error; + if (fwrite(&TDR_FILE_HEADER, sizeof(uint32), 1, file) != 1) + goto error; + + dshash_seq_init(&hstat, tdr_hash, false); + while ((entry = dshash_seq_next(&hstat)) != NULL) + { + int keylen = offsetof(TestDSMRegistryHashEntry, val); + char *val; + int32 vlen; + + val = (char *) dsa_get_address(tdr_dsa, entry->val); + vlen = strlen(val) + 1; + if (fwrite(entry->key, keylen, 1, file) != 1 || + fwrite(&vlen, sizeof(int32), 1, file) != 1 || + fwrite(val, vlen, 1, file) != 1) + { + dshash_seq_term(&hstat); + goto error; + } + } + dshash_seq_term(&hstat); + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file into place, so we atomically replace any old one. + */ + (void) durable_rename(TDR_DUMP_FILE ".tmp", TDR_DUMP_FILE, LOG); + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + TDR_DUMP_FILE ".tmp"))); + if (file) + FreeFile(file); + unlink(TDR_DUMP_FILE ".tmp"); +} + +/* + * Entry point for this module. + */ +void +_PG_init(void) +{ + prev_Checkpoint_hook = Checkpoint_hook; + Checkpoint_hook = pgss_Checkpoint; +} -- 2.51.2 Attachments: [text/plain] 0001-Add-a-hook-for-Checkpoint-processing.patch (3.4K, 2-0001-Add-a-hook-for-Checkpoint-processing.patch) download | inline diff: From a0e8d75223fa95dbec1e422eacaef336e45c2008 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" <[email protected]> Date: Thu, 13 Nov 2025 15:00:43 +0100 Subject: [PATCH 1/2] Add a hook for Checkpoint processing. There are many situations in which a Postgres plugin may need to maintain its internal state across restarts or crashes. Sometimes it wants to synchronise its state on logical replicas or be saved in a backup employing custom RMGR and WAL records. For statistical extensions, such as pg_stat_statements, it is okay to save their state on postmaster shutdown. However, business extensions may want to maintain more actual state, periodically dumping it to a disk file or using WAL and xact callbacks to be as close as possible to the current database state. Checkpoint is a key moment where the DBMS performs disk synchronisation and cuts the WAL. It is a good time to do the same thing for a plugin, too. Moreover, the plugin is sure that nothing important will be lost with the WAL cut. Discussion: https://www.postgresql.org/message-id/CANbhV-E4pTWeF-DsdaGsOrjJNFWPaR%2BDstjrnkqvf9JFFgOKKQ%40mail.gmail.com --- src/backend/access/transam/xlog.c | 15 +++++++++++++++ src/include/access/xlog.h | 4 ++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 20 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 22d0a2e8c3a..c7c0b226724 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -157,6 +157,13 @@ int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; */ int CheckPointSegments; +/* + * Hook for plugins to take control during checkpoint processing. All + * preparation procedures have already been done, and only the sync needs + * to be done. + */ +Checkpoint_hook_type Checkpoint_hook = NULL; + /* Estimated distance between checkpoints, in bytes */ static double CheckPointDistanceEstimate = 0; static double PrevCheckPointDistance = 0; @@ -7594,6 +7601,14 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointPredicate(); CheckPointBuffers(flags); + /* + * Allow a plugin that depends on a custom RMGR to retain its state through + * reboots or crashes by following specific steps, ensuring that essential + * WAL records are not truncated. + */ + if (Checkpoint_hook) + Checkpoint_hook(checkPointRedo, flags); + /* Perform all queued up fsyncs */ TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START(); CheckpointStats.ckpt_sync_t = GetCurrentTimestamp(); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 605280ed8fb..5c071974557 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -198,6 +198,10 @@ typedef enum WALAvailability struct XLogRecData; struct XLogReaderState; +/* Hook for plugins to get control at the end of a CheckPoint */ +typedef void (*Checkpoint_hook_type)(XLogRecPtr checkPointRedo, int flags); +extern PGDLLIMPORT Checkpoint_hook_type Checkpoint_hook; + extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 23bce72ae64..6ca05499081 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -413,6 +413,7 @@ CatalogIndexState ChangeVarNodes_callback ChangeVarNodes_context CheckPoint +Checkpoint_hook_type CheckPointStmt CheckpointStatsData CheckpointerRequest -- 2.51.2 [text/plain] 0002-Testing-module.patch (6.7K, 3-0002-Testing-module.patch) download | inline diff: From f92abbcc3667103628608d248870867200087e16 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" <[email protected]> Date: Fri, 14 Nov 2025 16:35:21 +0100 Subject: [PATCH 2/2] Testing module --- src/test/modules/test_dsm_registry/Makefile | 1 + .../test_dsm_registry/t/001_file_storage.pl | 31 ++++ .../test_dsm_registry/test_dsm_registry.c | 163 ++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 src/test/modules/test_dsm_registry/t/001_file_storage.pl diff --git a/src/test/modules/test_dsm_registry/Makefile b/src/test/modules/test_dsm_registry/Makefile index b13e99a354f..9aae8b98aba 100644 --- a/src/test/modules/test_dsm_registry/Makefile +++ b/src/test/modules/test_dsm_registry/Makefile @@ -10,6 +10,7 @@ EXTENSION = test_dsm_registry DATA = test_dsm_registry--1.0.sql REGRESS = test_dsm_registry +TAP_TESTS = 1 ifdef USE_PGXS PG_CONFIG = pg_config diff --git a/src/test/modules/test_dsm_registry/t/001_file_storage.pl b/src/test/modules/test_dsm_registry/t/001_file_storage.pl new file mode 100644 index 00000000000..0e82d0adcf7 --- /dev/null +++ b/src/test/modules/test_dsm_registry/t/001_file_storage.pl @@ -0,0 +1,31 @@ +# Copyright (c) 2023-2025, PostgreSQL Global Development Group +use strict; +use warnings FATAL => 'all'; +use Config; +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; + +my $node = PostgreSQL::Test::Cluster->new('node'); + +$node->init(); +$node->append_conf('postgresql.conf', + "shared_preload_libraries = 'test_dsm_registry'"); +$node->start(); + +$node->safe_psql('postgres', "CREATE EXTENSION test_dsm_registry"); + +my $result; + +$node->safe_psql('postgres', "SELECT set_val_in_hash('test-1', '1414')"); +$node->safe_psql('postgres', 'CHECKPOINT'); +$node->safe_psql('postgres', "SELECT set_val_in_hash('test-2', '1415')"); +$node->stop('immediate'); +$node->start(); + +$result = $node->safe_psql('postgres', "SELECT get_val_in_hash('test-1')"); +is($result, '1414', "Value inserted before the checkpoint was restored"); +$result = $node->safe_psql('postgres', "SELECT get_val_in_hash('test-2')"); +is($result, '', "Value inserted after the checkpoint was lost"); + +done_testing(); diff --git a/src/test/modules/test_dsm_registry/test_dsm_registry.c b/src/test/modules/test_dsm_registry/test_dsm_registry.c index 4cc2ccdac3f..2d7fd35a74d 100644 --- a/src/test/modules/test_dsm_registry/test_dsm_registry.c +++ b/src/test/modules/test_dsm_registry/test_dsm_registry.c @@ -12,13 +12,22 @@ */ #include "postgres.h" +#include "access/xlog.h" #include "fmgr.h" +#include "pgstat.h" #include "storage/dsm_registry.h" +#include "storage/fd.h" #include "storage/lwlock.h" #include "utils/builtins.h" +#include "utils/hsearch.h" PG_MODULE_MAGIC; +/* Location of permanent storage file (valid on checkpoint) */ +#define TDR_DUMP_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pg_stat_statements.stat" +/* Magic number identifying the stats file format */ +static const uint32 TDR_FILE_HEADER = 0x20251114; + typedef struct TestDSMRegistryStruct { int val; @@ -43,6 +52,11 @@ static const dshash_parameters dsh_params = { dshash_strcpy }; +static Checkpoint_hook_type prev_Checkpoint_hook = NULL; + +static void load_htab(void); +static void pgss_Checkpoint(XLogRecPtr checkPointRedo, int flags); + static void init_tdr_dsm(void *ptr) { @@ -66,7 +80,14 @@ tdr_attach_shmem(void) tdr_dsa = GetNamedDSA("test_dsm_registry_dsa", &found); if (tdr_hash == NULL) + { + LWLockAcquire(&tdr_dsm->lck, LW_EXCLUSIVE); tdr_hash = GetNamedDSHash("test_dsm_registry_hash", &dsh_params, &found); + if (!found) + load_htab(); + + LWLockRelease(&tdr_dsm->lck); + } } PG_FUNCTION_INFO_V1(set_val_in_shmem); @@ -144,3 +165,145 @@ get_val_in_hash(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(val); } + +/* + * Load any pre-existing entries from file. + */ +static void +load_htab(void) +{ + bool found; + FILE *file = NULL; + uint32 header; + char *val = palloc(1); + + Assert(tdr_dsa != NULL && tdr_hash != NULL); + + /* + * Attempt to load old entries from the dump file. + */ + file = AllocateFile(TDR_DUMP_FILE, PG_BINARY_R); + if (file == NULL) + { + if (errno != ENOENT) + goto read_error; + /* No existing persisted file, so we're done */ + return; + } + + if (fread(&header, sizeof(uint32), 1, file) != 1 || + header != TDR_FILE_HEADER) + goto read_error; + + while (!feof(file)) + { + TestDSMRegistryHashEntry *entry; + char key[64]; + int keylen = offsetof(TestDSMRegistryHashEntry, val); + int32 vlen; + + if (fread(key, keylen, 1, file) != 1 || + fread(&vlen, sizeof(int32), 1, file) != 1) + goto read_error; + + val = repalloc(val, vlen); + if (fread(val, vlen, 1, file) != 1) + goto read_error; + + Assert(val[vlen - 1] == '\0'); + + entry = (TestDSMRegistryHashEntry *) + dshash_find_or_insert(tdr_hash, key, &found); + Assert(!found); + + entry->val = dsa_allocate(tdr_dsa, strlen(val) + 1); + strcpy(dsa_get_address(tdr_dsa, entry->val), val); + + dshash_release_lock(tdr_hash, entry); + } + + FreeFile(file); + return; + +read_error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not read from file \"%s\": %m", TDR_DUMP_FILE))); + if (file) + FreeFile(file); + /* If possible, throw away the bogus file; ignore any error */ + unlink(TDR_DUMP_FILE); +} + +/* + * Dump hash table into file. + * + */ +static void +pgss_Checkpoint(XLogRecPtr checkPointRedo, int flags) +{ + FILE *file; + dshash_seq_status hstat; + TestDSMRegistryHashEntry *entry; + + if (flags & CHECKPOINT_END_OF_RECOVERY) + return; + + tdr_attach_shmem(); + + file = AllocateFile(TDR_DUMP_FILE ".tmp", PG_BINARY_W); + if (file == NULL) + goto error; + if (fwrite(&TDR_FILE_HEADER, sizeof(uint32), 1, file) != 1) + goto error; + + dshash_seq_init(&hstat, tdr_hash, false); + while ((entry = dshash_seq_next(&hstat)) != NULL) + { + int keylen = offsetof(TestDSMRegistryHashEntry, val); + char *val; + int32 vlen; + + val = (char *) dsa_get_address(tdr_dsa, entry->val); + vlen = strlen(val) + 1; + if (fwrite(entry->key, keylen, 1, file) != 1 || + fwrite(&vlen, sizeof(int32), 1, file) != 1 || + fwrite(val, vlen, 1, file) != 1) + { + dshash_seq_term(&hstat); + goto error; + } + } + dshash_seq_term(&hstat); + + if (FreeFile(file)) + { + file = NULL; + goto error; + } + + /* + * Rename file into place, so we atomically replace any old one. + */ + (void) durable_rename(TDR_DUMP_FILE ".tmp", TDR_DUMP_FILE, LOG); + return; + +error: + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + TDR_DUMP_FILE ".tmp"))); + if (file) + FreeFile(file); + unlink(TDR_DUMP_FILE ".tmp"); +} + +/* + * Entry point for this module. + */ +void +_PG_init(void) +{ + prev_Checkpoint_hook = Checkpoint_hook; + Checkpoint_hook = pgss_Checkpoint; +} -- 2.51.2 ^ permalink raw reply [nested|flat] 12+ messages in thread
end of thread, other threads:[~2025-11-15 10:44 UTC | newest] Thread overview: 12+ messages (download: mbox mbox.gz follow: Atom feed) -- links below jump to the message on this page -- 2024-03-21 12:47 Re: Comments on Custom RMGRs Danil Anisimow <[email protected]> 2024-03-21 19:02 ` Jeff Davis <[email protected]> 2024-03-29 11:20 ` Danil Anisimow <[email protected]> 2024-03-29 17:09 ` Jeff Davis <[email protected]> 2024-05-17 18:56 ` Robert Haas <[email protected]> 2024-05-17 20:20 ` Jeff Davis <[email protected]> 2024-05-17 20:25 ` Robert Haas <[email protected]> 2024-05-27 18:20 ` Michael Paquier <[email protected]> 2025-10-14 09:11 ` Andrei Lepikhov <[email protected]> 2025-11-15 10:44 ` Andrei Lepikhov <[email protected]> 2024-05-27 21:32 ` Tristan Partin <[email protected]> 2024-03-21 19:07 ` Jeff Davis <[email protected]>
This inbox is served by agora; see mirroring instructions for how to clone and mirror all data and code used for this inbox