From 2e5e6bdc3d28e342f525fb0f1cf76719f7a3213b Mon Sep 17 00:00:00 2001 From: Filip Janus Date: Thu, 31 Jul 2025 14:02:16 +0200 Subject: [PATCH 1/2] Add transparent compression for temporary files This commit implements transparent compression for temporary files in PostgreSQL, specifically designed for hash join operations that spill to disk. Features: - Support for LZ4 and PGLZ compression algorithms - GUC parameter 'temp_file_compression' to control compression - Transparent compression/decompression in BufFile layer - Shared compression buffer to minimize memory allocation - Integration: Hash join uses BufFileCreateCompressTemp(), tuplestore uses compression when EXEC_FLAG_BACKWARD is not required The compression is applied automatically when temp_file_compression is enabled, with no changes required to calling code. Only hash joins use compression currently, with seeking limited to rewinding to start. Configuration options: - temp_file_compression = 'no' (default) - temp_file_compression = 'pglz' - temp_file_compression = 'lz4' (requires --with-lz4) This patch also adds documentation for guc-temp-file-compression (contributed by Lakshmi ). --- doc/src/sgml/config.sgml | 20 + src/backend/executor/nodeHashjoin.c | 2 +- src/backend/storage/file/buffile.c | 374 +++++++++++++++++- src/backend/utils/misc/guc_parameters.dat | 8 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 5 +- src/backend/utils/sort/tuplestore.c | 8 +- src/include/storage/buffile.h | 10 + 8 files changed, 414 insertions(+), 23 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5560b95ee60..179e09a8604 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1955,6 +1955,26 @@ include_dir 'conf.d' + + temp_file_compression (enum) + + + Enables transparent compression of temporary files used by query execution. + Supported values are no, lz4, and + pglz. + + + + The effectiveness of temporary file compression depends on the workload. + For example, temporary files created by hash join spills use fixed-size + chunks, so on-disk file sizes may not visibly shrink even when compression + is enabled. Statistics such as temp_bytes report logical + bytes written before compression. + + + + + hash_mem_multiplier (floating point) diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 114620a8137..c2f1b2658e1 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, { MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt); - file = BufFileCreateTemp(false); + file = BufFileCreateCompressTemp(false); *fileptr = file; MemoryContextSwitchTo(oldctx); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index ddf3a410d6f..e6d2af0f815 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -53,6 +53,12 @@ #include "storage/bufmgr.h" #include "storage/fd.h" #include "utils/resowner.h" +#include "utils/memutils.h" +#include "common/pg_lzcompress.h" + +#ifdef USE_LZ4 +#include +#endif /* * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE. @@ -62,6 +68,11 @@ #define MAX_PHYSICAL_FILESIZE 0x40000000 #define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ) +/* + * Optional transparent compression of temporary files. Disabled by default. + */ +int temp_file_compression = TEMP_NONE_COMPRESSION; + /* * This data structure represents a buffered file that consists of one or * more physical files (each accessed through a virtual file descriptor @@ -101,8 +112,28 @@ struct BufFile * wasting per-file alignment padding when some users create many files. */ PGAlignedBlock buffer; + + int compress; /* enabled compression for the file */ + char *cBuffer; /* compression buffer */ }; +/* + * Header written right before each chunk of data with compression enabled. + * The 'len' is the length of the data buffer written right after the header, + * and 'raw_len' is the length of uncompressed data. If the data ends up not + * being compressed (e.g. when pglz does not reach the compression ratio), + * the raw_len is set to -1 and the len is the raw (uncompressed) length. + * + * To make things simpler, we write these headers even for methods that do + * not fail (or rather when they fail, it's a proper error). The space for + * an extra integer seems negligible. + */ +typedef struct CompressHeader +{ + int len; /* data length (compressed, excluding header) */ + int raw_len; /* raw length (-1: not compressed) */ +} CompressHeader; + static BufFile *makeBufFileCommon(int nfiles); static BufFile *makeBufFile(File firstfile); static void extendBufFile(BufFile *file); @@ -127,6 +158,8 @@ makeBufFileCommon(int nfiles) file->curOffset = 0; file->pos = 0; file->nbytes = 0; + file->compress = TEMP_NONE_COMPRESSION; + file->cBuffer = NULL; return file; } @@ -215,6 +248,63 @@ BufFileCreateTemp(bool interXact) return file; } +/* + * BufFileCreateCompressTemp + * Create a temporary file with transparent compression. + * + * The temporary files will use compression, depending on the current value of + * temp_file_compression GUC. + * + * Note: Compressed files do not support random access. A seek operation other + * than seek to the beginning of the buffile will corrupt data. + * + * Note: The compression algorithm is determined by temp_file_compression GUC. + * If set to "none" (TEMP_NONE_COMPRESSION), the file is not compressed. + * + */ +BufFile * +BufFileCreateCompressTemp(bool interXact) +{ + BufFile *file = BufFileCreateTemp(interXact); + + if (temp_file_compression != TEMP_NONE_COMPRESSION) + { + int size = 0; + + switch (temp_file_compression) + { + case TEMP_LZ4_COMPRESSION: +#ifdef USE_LZ4 + size = LZ4_compressBound(BLCKSZ) + sizeof(CompressHeader); +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + case TEMP_PGLZ_COMPRESSION: + size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ) + sizeof(CompressHeader); + break; + case TEMP_NONE_COMPRESSION: + /* no compression, nothing to do */ + break; + default: + elog(ERROR, "unknown compression method: %d", temp_file_compression); + break; + } + + if (size > 0) + { + file->compress = temp_file_compression; + file->cBuffer = palloc(size); + } + } + + /* compression with buffer, or no compression and no buffer */ + Assert((!file->compress && file->cBuffer == NULL) || + (file->compress && file->cBuffer != NULL)); + + return file; +} + /* * Build the name for a given segment of a given BufFile. */ @@ -415,9 +505,15 @@ BufFileClose(BufFile *file) /* flush any unwritten data */ BufFileFlush(file); + /* close and delete the underlying file(s) */ for (i = 0; i < file->numFiles; i++) FileClose(file->files[i]); + + /* release compression buffer if allocated */ + if (file->cBuffer) + pfree(file->cBuffer); + /* release the buffer space */ pfree(file->files); pfree(file); @@ -454,21 +550,145 @@ BufFileLoadBuffer(BufFile *file) else INSTR_TIME_SET_ZERO(io_start); - /* - * Read whatever we can get, up to a full bufferload. - */ - file->nbytes = FileRead(thisfile, - file->buffer.data, - sizeof(file->buffer.data), - file->curOffset, - WAIT_EVENT_BUFFILE_READ); - if (file->nbytes < 0) + if (file->compress == TEMP_NONE_COMPRESSION) { - file->nbytes = 0; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", - FilePathName(thisfile)))); + /* + * Read whatever we can get, up to a full bufferload. + */ + file->nbytes = FileRead(thisfile, + file->buffer.data, + sizeof(file->buffer), + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + if (file->nbytes < 0) + { + file->nbytes = 0; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(thisfile)))); + } + } + else + { + /* + * Read and decompress data from a temporary file. We first read the + * header with compressed/raw lengths, and then the compressed data. + */ + int nread; + CompressHeader header; + + nread = FileRead(thisfile, + &header, + sizeof(header), + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + + /* did we read the length of the next buffer? */ + if (nread == 0) + { + /* eof, nothing to do */ + } + else if (nread != sizeof(header)) + { + /* unexpected number of bytes, also covers (nread < 0) */ + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(thisfile)))); + } + else + { + /* read length of compressed data, read (and decompress) data */ + char *buff = file->cBuffer; + + Assert(file->cBuffer != NULL); + + /* advance past the length field */ + file->curOffset += sizeof(header); + + /* + * raw_len==-1 means the data was not compressed after all, which + * can happen e.g. for non-compressible data with pglz. In that + * case just copy the data in place. Otherwise do the + * decompression. + * + * XXX Maybe we should just do the FileRead first, and then either + * decompress or memcpy() for raw_len=-1. That'd be an extra + * memcpy, but it'd make the code simpler (this ways we do the + * error checks twice, for each branch). + */ + if (header.raw_len == -1) + { + nread = FileRead(thisfile, + file->buffer.data, + header.len, + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + if (nread != header.len) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(thisfile)))); + } + + file->nbytes = nread; + file->curOffset += nread; + } + else + { + /* + * Read compressed data into the separate buffer, and then + * decompress into the target file buffer. + */ + nread = FileRead(thisfile, + buff, + header.len, + file->curOffset, + WAIT_EVENT_BUFFILE_READ); + if (nread != header.len) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + FilePathName(thisfile)))); + } + + switch (file->compress) + { + case TEMP_LZ4_COMPRESSION: +#ifdef USE_LZ4 + file->nbytes = LZ4_decompress_safe(buff, + file->buffer.data, header.len, + sizeof(file->buffer)); +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + + case TEMP_PGLZ_COMPRESSION: + file->nbytes = pglz_decompress(buff, header.len, + file->buffer.data, header.raw_len, false); + break; + case TEMP_NONE_COMPRESSION: + /* no compression, nothing to do */ + break; + default: + elog(ERROR, "unknown compression method: %d", file->compress); + break; + } + file->curOffset += nread; + + if (file->nbytes < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed data is corrupt"))); + + /* should have got the expected length */ + Assert(file->nbytes == header.raw_len); + } + } } if (track_io_timing) @@ -494,8 +714,101 @@ static void BufFileDumpBuffer(BufFile *file) { int64 wpos = 0; - int64 bytestowrite; + int64 bytestowrite = 0; File thisfile; + char *DataToWrite = file->buffer.data; + int nbytesOriginal = file->nbytes; + + /* + * Compress the data if requested for this temporary file (and if enabled + * by the temp_file_compression GUC). + * + * The compressed data is written to the one shared compression buffer. + * There's only a single compression operation at any given time, so one + * buffer is enough. + * + * Then we simply point the "DataToWrite" buffer at the compressed buffer. + */ + if (file->compress != TEMP_NONE_COMPRESSION) + { + char *cData; + int cSize = 0; + CompressHeader header; + + Assert(file->cBuffer != NULL); + cData = file->cBuffer; + + /* initialize the header for compression */ + header.len = -1; + header.raw_len = nbytesOriginal; + + switch (file->compress) + { + case TEMP_LZ4_COMPRESSION: + { +#ifdef USE_LZ4 + int cBufferSize = LZ4_compressBound(file->nbytes); + + /* + * XXX We might use lz4 stream compression here. Depending + * on the data, that might improve the compression ratio. + * The length is stored at the beginning, we'll fill it in + * at the end. + */ + cSize = LZ4_compress_default(file->buffer.data, + cData + sizeof(CompressHeader), + file->nbytes, cBufferSize); + if (cSize == 0) + { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compression failed, compressed size %d, original size %d", + cSize, nbytesOriginal))); + } +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + } + case TEMP_PGLZ_COMPRESSION: + cSize = pglz_compress(file->buffer.data, file->nbytes, + cData + sizeof(CompressHeader), + PGLZ_strategy_always); + + /* + * pglz returns -1 for non-compressible data. In that case + * just copy the raw data into the output buffer. + */ + if (cSize == -1) + { + memcpy(cData + sizeof(CompressHeader), file->buffer.data, + header.raw_len); + + cSize = header.raw_len; + header.raw_len = -1; + } + break; + case TEMP_NONE_COMPRESSION: + /* no compression, nothing to do */ + break; + default: + elog(ERROR, "unknown compression method: %d", file->compress); + break; + } + + Assert(cSize != -1); + header.len = cSize; + + /* + * Write the header with compressed length at the beginning of the + * buffer. We store both the compressed and raw lengths, and use + * raw_len=-1 when the data was not compressed after all. + */ + memcpy(cData, &header, sizeof(CompressHeader)); + file->nbytes = header.len + sizeof(CompressHeader); + + DataToWrite = cData; + } /* * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it @@ -535,7 +848,7 @@ BufFileDumpBuffer(BufFile *file) INSTR_TIME_SET_ZERO(io_start); bytestowrite = FileWrite(thisfile, - file->buffer.data + wpos, + DataToWrite + wpos, bytestowrite, file->curOffset, WAIT_EVENT_BUFFILE_WRITE); @@ -564,7 +877,17 @@ BufFileDumpBuffer(BufFile *file) * logical file position, ie, original value + pos, in case that is less * (as could happen due to a small backwards seek in a dirty buffer!) */ - file->curOffset -= (file->nbytes - file->pos); + if (!file->compress) + file->curOffset -= (file->nbytes - file->pos); + else if (nbytesOriginal - file->pos != 0) + { + /* + * curOffset must be corrected also if compression is enabled, nbytes + * was changed by compression but we have to use the original value of + * nbytes + */ + file->curOffset -= bytestowrite; + } if (file->curOffset < 0) /* handle possible segment crossing */ { file->curFile--; @@ -602,8 +925,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK) { if (file->pos >= file->nbytes) { - /* Try to load more data into buffer. */ - file->curOffset += file->pos; + /* + * Try to load more data into buffer. + * + * curOffset is moved within BufFileLoadBuffer because stored data + * size differs from loaded/ decompressed size + */ + if (!file->compress) + file->curOffset += file->pos; file->pos = 0; file->nbytes = 0; BufFileLoadBuffer(file); @@ -742,6 +1071,10 @@ BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence) int newFile; pgoff_t newOffset; + /* Compressed files only support seek to the beginning */ + Assert(file->compress == TEMP_NONE_COMPRESSION || + (whence == SEEK_SET && fileno == 0 && offset == 0)); + switch (whence) { case SEEK_SET: @@ -831,6 +1164,9 @@ BufFileSeek(BufFile *file, int fileno, pgoff_t offset, int whence) void BufFileTell(BufFile *file, int *fileno, pgoff_t *offset) { + /* Tell doesn't work correctly for compressed files */ + Assert(file->compress == TEMP_NONE_COMPRESSION); + *fileno = file->curFile; *offset = file->curOffset + file->pos; } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index f0260e6e412..55bf207637f 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2931,6 +2931,14 @@ check_hook => 'check_temp_buffers', }, +{ name => 'temp_file_compression', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT', + short_desc => 'Sets the default compression method for temporary files.', + long_desc => 'Compression is applied transparently to temporary files used by hash joins. Note that the logged file sizes may appear the same regardless of compression method due to fixed-size chunking; actual disk space savings depend on data compressibility.', + variable => 'temp_file_compression', + boot_val => 'TEMP_NONE_COMPRESSION', + options => 'temp_file_compression_options', +}, + { name => 'temp_file_limit', type => 'int', context => 'PGC_SUSET', group => 'RESOURCES_DISK', short_desc => 'Limits the total size of all temporary files used by each process.', long_desc => '-1 means no limit.', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 13c569d8790..945e00920cd 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -78,6 +78,7 @@ #include "replication/syncrep.h" #include "storage/aio.h" #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/bufpage.h" #include "storage/copydir.h" #include "storage/io_worker.h" @@ -464,6 +465,15 @@ static const struct config_enum_entry default_toast_compression_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry temp_file_compression_options[] = { + {"no", TEMP_NONE_COMPRESSION, false}, + {"pglz", TEMP_PGLZ_COMPRESSION, false}, +#ifdef USE_LZ4 + {"lz4", TEMP_LZ4_COMPRESSION, false}, +#endif + {NULL, 0, false} +}; + static const struct config_enum_entry wal_compression_options[] = { {"pglz", WAL_COMPRESSION_PGLZ, false}, #ifdef USE_LZ4 diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c4f92fcdac8..db23696d243 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -180,8 +180,9 @@ #file_copy_method = copy # copy, clone (if supported by OS) -#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated - # for NOTIFY / LISTEN queue +#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated + # for NOTIFY / LISTEN queue +#temp_file_compression = 'no' # enables temporary files compression # - Kernel Resources - diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c index e57fa5bca64..76695e4c9e4 100644 --- a/src/backend/utils/sort/tuplestore.c +++ b/src/backend/utils/sort/tuplestore.c @@ -860,7 +860,13 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple) */ oldcxt = MemoryContextSwitchTo(state->context->parent); - state->myfile = BufFileCreateTemp(state->interXact); + /* + * If requested random access, can't compress the temp file. + */ + if ((state->eflags & EXEC_FLAG_BACKWARD) != 0) + state->myfile = BufFileCreateTemp(state->interXact); + else + state->myfile = BufFileCreateCompressTemp(state->interXact); MemoryContextSwitchTo(oldcxt); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 6754f37836c..2e52ca74bde 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -32,11 +32,21 @@ typedef struct BufFile BufFile; +typedef enum +{ + TEMP_NONE_COMPRESSION, + TEMP_PGLZ_COMPRESSION, + TEMP_LZ4_COMPRESSION +} TempCompression; + +extern PGDLLIMPORT int temp_file_compression; + /* * prototypes for functions in buffile.c */ extern BufFile *BufFileCreateTemp(bool interXact); +extern BufFile *BufFileCreateCompressTemp(bool interXact); extern void BufFileClose(BufFile *file); pg_nodiscard extern size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern void BufFileReadExact(BufFile *file, void *ptr, size_t size); -- 2.39.5 (Apple Git-154)