public inbox for [email protected]
help / color / mirror / Atom feedFrom: Filip Janus <[email protected]>
To: [email protected]
Subject: Proposal: Adding compression of temporary files
Date: Thu, 14 Nov 2024 23:13:16 +0100
Message-ID: <CAFjYY+LAxqRpaJpukBx9KjeyhZvMu6O4xcCUU9gw1SFGsUTcDA@mail.gmail.com> (raw)
Hi all,
Postgresql supports data compression nowadays, but the compression of
temporary files has not been implemented yet. The huge queries can
produce a significant amount of temporary data that needs to be stored on
disk
and cause many expensive I/O operations.
I am attaching a proposal of the patch to enable temporary files
compression for
hashjoins for now. Initially, I've chosen the LZ4 compression algorithm. It
would
probably make better sense to start with pglz, but I realized it late.
# Future possible improvements
Reducing the number of memory allocations within the dumping and loading of
the buffer. I have two ideas for solving this problem. I would either add a
buffer into
struct BufFile or provide the buffer as an argument from the caller. For
the sequential
execution, I would prefer the second option.
# Future plan/open questions
In the future, I would like to add support for pglz and zstd. Further, I
plan to
extend the support of the temporary file compression also for sorting, gist
index creation, etc.
Experimenting with the stream mode of compression algorithms. The
compression
ratio of LZ4 in block mode seems to be satisfying, but the stream mode
could
produce a better ratio, but it would consume more memory due to the
requirement to store
context for LZ4 stream compression.
# Benchmark
I prepared three different databases to check expectations. Each
dataset is described below. My testing demonstrates that my patch
improves the execution time of huge hash joins.
Also, my implementation should not
negatively affect performance within smaller queries.
The usage of memory needed for temporary files was reduced in every
execution without a significant impact on execution time.
*## Dataset A:*
Tables
table_a(bigint id,text data_text,integer data_number) - 10000000 rows
table_b(bigint id, integer ref_id, numeric data_value, bytea data_blob) -
10000000 rows
Query: SELECT * FROM table_a a JOIN table_b b ON a.id = b.id;
The tables contain highly compressible data.
The query demonstrated a reduction in the usage of the temporary
files ~20GB -> 3GB, based on this reduction also caused the execution
time of the query to be reduced by about ~10s.
*## Dataset B:*
Tables:
table_a(integer id, text data_blob) - 1110000 rows
table_b(integer id, text data_blob) - 10000000 rows
Query: SELECT * FROM table_a a JOIN table_b b ON a.id = b.id;
The tables contain less compressible data. data_blob was generated by a
pseudo-random generator.
In this case, the data reduction was only ~50%. Also, the execution time
was reduced
only slightly with the enabled compression.
The second scenario demonstrates no overhead in the case of enabled
compression and extended work_mem to avoid temp file usage.
*## Dataset C:*
Tables
customers (integer,text,text,text,text)
order_items(integer,integer,integer,integer,numeric(10,2))
orders(integer,integer,timestamp,numeric(10,2))
products(integer,text,text,numeric(10,2),integer)
Query: SELECT p.product_id, p.name, p.price, SUM(oi.quantity) AS
total_quantity, AVG(oi.price) AS avg_item_price
FROM eshop.products p JOIN eshop.order_items oi ON p.product_id =
oi.product_id JOIN
eshop.orders o ON oi.order_id = o.order_id WHERE o.order_date > '2020-01-01'
AND p.price > 50
GROUP BY p.product_id, p.name, p.price HAVING SUM(oi.quantity) > 1000
ORDER BY total_quantity DESC LIMIT 100;
This scenario should demonstrate a more realistic usage of the database.
Enabled compression slightly reduced the temporary memory usage, but the
execution
time wasn't affected by compression.
+------------+-------------------------+-----------------------+------------------------------+
| Dataset | Compression. | temp_bytes | Execution Time
(ms) |
+------------+-------------------------+-----------------------+-----------------------------
+
| A | Yes | 3.09 GiB |
22s586ms | work_mem = 4MB
| | No | 21.89 GiB |
35s | work_mem = 4MB
+------------+-------------------------+-----------------------+----------------------------------------
| B | Yes | 333 MB |
1815.545 ms | work_mem = 4MB
| | No | 146 MB |
1500.460 ms | work_mem = 4MB
| | Yes | 0 MB
| 3262.305 ms | work_mem = 80MB
| | No | 0 MB
| 3174.725 ms | work_mem = 80MB
+-------------+------------------------+------------------------+-------------------------------------
| C | Yes | 40 MB
| 1011.020 ms | work_mem = 1MB
| | No | 53 MB |
1034.142 ms | work_mem = 1MB
+------------+------------------------+------------------------+--------------------------------------
Regards,
-Filip-
Attachments:
[application/octet-stream] 0001-This-commit-adds-support-for-temporary-files-compres.patch (13.4K, 3-0001-This-commit-adds-support-for-temporary-files-compres.patch)
download | inline diff:
From 1f79947c3348a7f3596062cad2236855053641a6 Mon Sep 17 00:00:00 2001
From: Filip <[email protected]>
Date: Thu, 24 Oct 2024 12:15:10 +0200
Subject: [PATCH v1] Add support for temporary files compression
This commit adds support for temporary files compression, it can be
used only for hashjoins now.
It also adds GUC parameter temp_file_compression that enables this functionality.
For now, it supports just lz4 algorithms. In the future, it
could also be implemented pglz and zstd support.
---
src/backend/access/gist/gistbuildbuffers.c | 2 +-
src/backend/backup/backup_manifest.c | 2 +-
src/backend/executor/nodeHashjoin.c | 2 +-
src/backend/storage/file/buffile.c | 168 +++++++++++++++++-
src/backend/utils/misc/guc_tables.c | 23 +++
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/backend/utils/sort/logtape.c | 2 +-
src/backend/utils/sort/tuplestore.c | 2 +-
src/include/storage/buffile.h | 13 +-
9 files changed, 200 insertions(+), 15 deletions(-)
diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c
index 4c2301da00..9b3b00142a 100644
--- a/src/backend/access/gist/gistbuildbuffers.c
+++ b/src/backend/access/gist/gistbuildbuffers.c
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
* Create a temporary file to hold buffer pages that are swapped out of
* memory.
*/
- gfbb->pfile = BufFileCreateTemp(false);
+ gfbb->pfile = BufFileCreateTemp(false, false);
gfbb->nFileBlocks = 0;
/* Initialize free page management. */
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
index a2e2f86332..f8a3e1f0f4 100644
--- a/src/backend/backup/backup_manifest.c
+++ b/src/backend/backup/backup_manifest.c
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
manifest->buffile = NULL;
else
{
- manifest->buffile = BufFileCreateTemp(false);
+ manifest->buffile = BufFileCreateTemp(false, false);
manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
elog(ERROR, "failed to initialize checksum of backup manifest: %s",
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 2f7170604d..1b5c6448ef 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
{
MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
- file = BufFileCreateTemp(false);
+ file = BufFileCreateTemp(false, true);
*fileptr = file;
MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a27f51f622..f721447db4 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -54,6 +54,16 @@
#include "storage/fd.h"
#include "utils/resowner.h"
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#define NO_LZ4_SUPPORT() \
+ ereport(ERROR, \
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+ errmsg("compression method lz4 not supported"), \
+ errdetail("This functionality requires the server to be built with lz4 support.")))
+
/*
* We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
* The reason is that we'd like large BufFiles to be spread across multiple
@@ -62,6 +72,8 @@
#define MAX_PHYSICAL_FILESIZE 0x40000000
#define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ)
+int temp_file_compression = TEMP_NONE_COMPRESSION;
+
/*
* This data structure represents a buffered file that consists of one or
* more physical files (each accessed through a virtual file descriptor
@@ -95,7 +107,7 @@ struct BufFile
off_t curOffset; /* offset part of current pos */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
-
+ bool compress; /* State of usege file compression */
/*
* XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
* wasting per-file alignment padding when some users create many files.
@@ -127,6 +139,7 @@ makeBufFileCommon(int nfiles)
file->curOffset = 0;
file->pos = 0;
file->nbytes = 0;
+ file->compress = false;
return file;
}
@@ -190,7 +203,7 @@ extendBufFile(BufFile *file)
* transaction boundaries.
*/
BufFile *
-BufFileCreateTemp(bool interXact)
+BufFileCreateTemp(bool interXact, bool compress)
{
BufFile *file;
File pfile;
@@ -212,6 +225,15 @@ BufFileCreateTemp(bool interXact)
file = makeBufFile(pfile);
file->isInterXact = interXact;
+ if (temp_file_compression != TEMP_NONE_COMPRESSION)
+ {
+#ifdef USE_LZ4
+ file->compress = compress;
+#else
+ NO_LZ4_SUPPORT();
+#endif
+ }
+
return file;
}
@@ -275,6 +297,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
file->files[0] = MakeNewFileSetSegment(file, 0);
file->readOnly = false;
+
return file;
}
@@ -455,13 +478,72 @@ BufFileLoadBuffer(BufFile *file)
INSTR_TIME_SET_ZERO(io_start);
/*
- * Read whatever we can get, up to a full bufferload.
+ * Load data as it is stored in the temporary file
*/
- file->nbytes = FileRead(thisfile,
+ if (!file->compress)
+ {
+
+ /*
+ * Read whatever we can get, up to a full bufferload.
+ */
+ file->nbytes = FileRead(thisfile,
file->buffer.data,
sizeof(file->buffer),
file->curOffset,
WAIT_EVENT_BUFFILE_READ);
+ /*
+ * Read and decompress data from the temporary file
+ * The first reading loads size of the compressed block
+ * Second reading loads compressed data
+ */
+ } else {
+ int nread;
+ int nbytes;
+
+ nread = FileRead(thisfile,
+ &nbytes,
+ sizeof(nbytes),
+ file->curOffset,
+ WAIT_EVENT_BUFFILE_READ);
+ /* if not EOF let's continue */
+ if (nread > 0)
+ {
+ /*
+ * A long life buffer would make sence to limit number of
+ * memory allocations
+ */
+ char * buff;
+
+ /*
+ * Read compressed data, curOffset differs with pos
+ * It reads less data than it returns to caller
+ * So the curOffset must be advanced here based on compressed size
+ */
+ file->curOffset+=sizeof(nbytes);
+
+ buff = palloc(nbytes);
+
+ nread = FileRead(thisfile,
+ buff,
+ nbytes,
+ file->curOffset,
+ WAIT_EVENT_BUFFILE_READ);
+
+#ifdef USE_LZ4
+ file->nbytes = LZ4_decompress_safe(buff,
+ file->buffer.data,nbytes,sizeof(file->buffer));
+ file->curOffset += nread;
+#endif
+
+ if (file->nbytes < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("compressed lz4 data is corrupt")));
+ pfree(buff);
+ }
+
+ }
+
if (file->nbytes < 0)
{
file->nbytes = 0;
@@ -494,9 +576,56 @@ static void
BufFileDumpBuffer(BufFile *file)
{
int wpos = 0;
- int bytestowrite;
+ int bytestowrite = 0;
File thisfile;
+
+ /* Save nbytes value because the size changes due to compression */
+ int nbytesOriginal = file->nbytes;
+
+ bool compression = false;
+
+ char * DataToWrite;
+ DataToWrite = file->buffer.data;
+
+ /*
+ * Prepare compressed data to write
+ * size of compressed block needs to be added at the beggining of the
+ * compressed data
+ */
+
+
+ if (file->compress) {
+ int cBufferSize = 0;
+ char * cData;
+ int cSize;
+#ifdef USE_LZ4
+ cBufferSize = LZ4_compressBound(file->nbytes);
+#endif
+ /*
+ * A long life buffer would make sence to limit number of
+ * memory allocations
+ */
+ compression = true;
+ cData = palloc(cBufferSize + sizeof(int));
+#ifdef USE_LZ4
+ /*
+ * Using stream compression would lead to the slight improvement in
+ * compression ratio
+ */
+ cSize = LZ4_compress_default(file->buffer.data,
+ cData + sizeof(int),file->nbytes, cBufferSize);
+#endif
+
+ /* Write size of compressed block in front of compressed data
+ * It's used to determine amount of data to read within
+ * decompression process
+ */
+ memcpy(cData,&cSize,sizeof(int));
+ file->nbytes=cSize + sizeof(int);
+ DataToWrite = cData;
+ }
+
/*
* Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
* crosses a component-file boundary; so we need a loop.
@@ -535,7 +664,7 @@ BufFileDumpBuffer(BufFile *file)
INSTR_TIME_SET_ZERO(io_start);
bytestowrite = FileWrite(thisfile,
- file->buffer.data + wpos,
+ DataToWrite + wpos,
bytestowrite,
file->curOffset,
WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +693,19 @@ BufFileDumpBuffer(BufFile *file)
* logical file position, ie, original value + pos, in case that is less
* (as could happen due to a small backwards seek in a dirty buffer!)
*/
- file->curOffset -= (file->nbytes - file->pos);
+
+
+ if (!file->compress)
+ file->curOffset -= (file->nbytes - file->pos);
+ else
+ if (nbytesOriginal - file->pos != 0)
+ /* curOffset must be corrected also if compression is
+ * enabled, nbytes was changed by compression but we
+ * have to use the original value of nbytes
+ */
+ file->curOffset-=bytestowrite;
+
+
if (file->curOffset < 0) /* handle possible segment crossing */
{
file->curFile--;
@@ -577,6 +718,9 @@ BufFileDumpBuffer(BufFile *file)
*/
file->pos = 0;
file->nbytes = 0;
+
+ if (compression)
+ pfree(DataToWrite);
}
/*
@@ -602,8 +746,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
{
if (file->pos >= file->nbytes)
{
- /* Try to load more data into buffer. */
- file->curOffset += file->pos;
+ /* Try to load more data into buffer.
+ *
+ * curOffset is moved within BufFileLoadBuffer
+ * because stored data size differs from loaded/
+ * decompressed size
+ * */
+ if (!file->compress)
+ file->curOffset += file->pos;
file->pos = 0;
file->nbytes = 0;
BufFileLoadBuffer(file);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..3821caf763 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/buffile.h"
#include "storage/large_object.h"
#include "storage/pg_shmem.h"
#include "storage/predicate.h"
@@ -454,6 +455,17 @@ static const struct config_enum_entry default_toast_compression_options[] = {
#endif
{NULL, 0, false}
};
+/*
+ * pglz and zstd support should be added as future enhancement
+ *
+ */
+static const struct config_enum_entry temp_file_compression_options[] = {
+ {"no", TEMP_NONE_COMPRESSION, false},
+#ifdef USE_LZ4
+ {"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+ {NULL, 0, false}
+};
static const struct config_enum_entry wal_compression_options[] = {
{"pglz", WAL_COMPRESSION_PGLZ, false},
@@ -4856,6 +4868,17 @@ struct config_enum ConfigureNamesEnum[] =
NULL, NULL, NULL
},
+ {
+ {"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT,
+ gettext_noop("Sets the default compression method for compressible values."),
+ NULL
+ },
+ &temp_file_compression,
+ TEMP_NONE_COMPRESSION,
+ temp_file_compression_options,
+ NULL, NULL, NULL
+ },
+
{
{"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the transaction isolation level of each new transaction."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..e9c0b36352 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -177,6 +177,7 @@
#max_notify_queue_pages = 1048576 # limits the number of SLRU pages allocated
# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no' # enables temporary files compression
# - Kernel Resources -
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 44b30e86ad..af43b3ebb1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
}
else
- lts->pfile = BufFileCreateTemp(false);
+ lts->pfile = BufFileCreateTemp(false, false);
return lts;
}
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index a720d70200..a952f0f4f5 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
*/
oldcxt = MemoryContextSwitchTo(state->context->parent);
- state->myfile = BufFileCreateTemp(state->interXact);
+ state->myfile = BufFileCreateTemp(state->interXact, false);
MemoryContextSwitchTo(oldcxt);
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 5f6d7c8e3f..486b552e31 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,22 @@
typedef struct BufFile BufFile;
+typedef enum
+{
+ TEMP_NONE_COMPRESSION,
+#ifdef USE_LZ4
+ TEMP_LZ4_COMPRESSION
+#endif
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
+
/*
* prototypes for functions in buffile.c
*/
-extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateTemp(bool interXact, bool compress);
extern void BufFileClose(BufFile *file);
extern pg_nodiscard size_t BufFileRead(BufFile *file, void *ptr, size_t size);
extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
--
2.46.2
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: Proposal: Adding compression of temporary files
In-Reply-To: <CAFjYY+LAxqRpaJpukBx9KjeyhZvMu6O4xcCUU9gw1SFGsUTcDA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox