public inbox for [email protected]  
help / color / mirror / Atom feed
From: Filip Janus <[email protected]>
To: [email protected]
Subject: Re: Proposal: Adding compression of temporary files
Date: Mon, 18 Nov 2024 22:58:27 +0100
Message-ID: <CAFjYY+JJ3x-QUBpSYr5eTdapERhS9Nw3SEAH+QnBB=kypoXUJw@mail.gmail.com> (raw)
In-Reply-To: <CAFjYY+LAxqRpaJpukBx9KjeyhZvMu6O4xcCUU9gw1SFGsUTcDA@mail.gmail.com>
References: <CAFjYY+LAxqRpaJpukBx9KjeyhZvMu6O4xcCUU9gw1SFGsUTcDA@mail.gmail.com>

Let's fix the compiler warning caused by an uninitialized local variable.

    -Filip-


čt 14. 11. 2024 v 23:13 odesílatel Filip Janus <[email protected]> napsal:

> Hi all,
> Postgresql supports data compression nowadays, but the compression of
> temporary files has not been implemented yet. The huge queries can
> produce a significant amount of temporary data that needs to be stored on
> disk
> and cause many expensive I/O operations.
> I am attaching a proposal of the patch to enable temporary files
> compression for
> hashjoins for now. Initially, I've chosen the LZ4 compression algorithm.
> It would
> probably make better sense to start with pglz, but I realized it late.
>
> # Future possible improvements
> Reducing the number of memory allocations within the dumping and loading of
> the buffer. I have two ideas for solving this problem. I would either add
> a buffer into
> struct BufFile or provide the buffer as an argument from the caller. For
> the sequential
> execution, I would prefer the second option.
>
> # Future plan/open questions
> In the future, I would like to add support for pglz and zstd. Further, I
> plan to
> extend the support of the temporary file compression also for sorting,
> gist index creation, etc.
>
> Experimenting with the stream mode of compression algorithms. The
> compression
> ratio of LZ4 in block mode seems to be satisfying, but the stream mode
> could
> produce a better ratio, but it would consume more memory due to the
> requirement to store
> context for LZ4 stream compression.
>
> # Benchmark
> I prepared three different databases to check expectations. Each
> dataset is described below. My testing demonstrates that my patch
> improves the execution time of huge hash joins.
> Also, my implementation should not
> negatively affect performance within smaller queries.
> The usage of memory needed for temporary files was reduced in every
>  execution without a significant impact on execution time.
>
> *## Dataset A:*
> Tables
> table_a(bigint id,text data_text,integer data_number) - 10000000 rows
> table_b(bigint id, integer ref_id, numeric data_value, bytea data_blob) -
> 10000000 rows
> Query:  SELECT *  FROM table_a a JOIN table_b b ON a.id = b.id;
>
> The tables contain highly compressible data.
> The query demonstrated a reduction in the usage of the temporary
> files ~20GB -> 3GB, based on this reduction also caused the execution
> time of the query to be reduced by about ~10s.
>
>
> *## Dataset B:*
> Tables:
> table_a(integer id, text data_blob) - 1110000 rows
> table_b(integer id, text data_blob) - 10000000 rows
> Query:  SELECT *  FROM table_a a JOIN table_b b ON a.id = b.id;
>
> The tables contain less compressible data. data_blob was generated by a
> pseudo-random generator.
> In this case, the data reduction was only ~50%. Also, the execution time
> was reduced
> only slightly with the enabled compression.
>
> The second scenario demonstrates no overhead in the case of enabled
> compression and extended work_mem to avoid temp file usage.
>
> *## Dataset C:*
> Tables
> customers (integer,text,text,text,text)
> order_items(integer,integer,integer,integer,numeric(10,2))
> orders(integer,integer,timestamp,numeric(10,2))
> products(integer,text,text,numeric(10,2),integer)
>
> Query: SELECT p.product_id, p.name, p.price, SUM(oi.quantity) AS
> total_quantity, AVG(oi.price) AS avg_item_price
> FROM eshop.products p JOIN eshop.order_items oi ON p.product_id =
> oi.product_id JOIN
> eshop.orders o ON oi.order_id = o.order_id WHERE o.order_date >
> '2020-01-01' AND p.price > 50
> GROUP BY p.product_id, p.name, p.price HAVING SUM(oi.quantity) > 1000
> ORDER BY total_quantity DESC LIMIT 100;
>
> This scenario should demonstrate a more realistic usage of the database.
> Enabled compression slightly reduced the temporary memory usage, but the
> execution
> time wasn't affected by compression.
>
>
>
> +------------+-------------------------+-----------------------+------------------------------+
> |  Dataset   | Compression.       | temp_bytes         | Execution Time
> (ms)   |
> +------------+-------------------------+-----------------------+-----------------------------
> +
> | A             | Yes                        |  3.09 GiB            |
> 22s586ms           | work_mem  = 4MB
> |                | No                         |  21.89 GiB          |
> 35s                       | work_mem  = 4MB
>
> +------------+-------------------------+-----------------------+----------------------------------------
> | B             | Yes                        |  333 MB               |
> 1815.545 ms       | work_mem = 4MB
> |                 | No                        |  146  MB              |
> 1500.460 ms        | work_mem = 4MB
> |                 | Yes                       |  0 MB
> | 3262.305 ms        | work_mem = 80MB
> |                 | No                        |  0 MB
> | 3174.725 ms         | work_mem = 80MB
>
> +-------------+------------------------+------------------------+-------------------------------------
> | C             | Yes                       | 40 MB
> | 1011.020 ms        | work_mem = 1MB
> |                | No                        |  53 MB                 |
> 1034.142 ms        | work_mem = 1MB
>
> +------------+------------------------+------------------------+--------------------------------------
>
>
> Regards,
>
>     -Filip-
>


Attachments:

  [application/octet-stream] 0001-This-commit-adds-support-for-temporary-files-compres-v2.patch (13.4K, 3-0001-This-commit-adds-support-for-temporary-files-compres-v2.patch)
  download | inline diff:
From 790f913391cb9ffd5440202962674addbfb79001 Mon Sep 17 00:00:00 2001
From: Filip <[email protected]>
Date: Thu, 24 Oct 2024 12:15:10 +0200
Subject: [PATCH v2] Add support for temporary files compression
This commit adds support for temporary files compression, it can be
used only for hashjoins now.


It also adds GUC parameter temp_file_compression that enables this functionality.
For now, it supports just lz4 algorithms. In the future, it
could also be implemented pglz and zstd support.
---
 src/backend/access/gist/gistbuildbuffers.c    |   2 +-
 src/backend/backup/backup_manifest.c          |   2 +-
 src/backend/executor/nodeHashjoin.c           |   2 +-
 src/backend/storage/file/buffile.c            | 168 +++++++++++++++++-
 src/backend/utils/misc/guc_tables.c           |  23 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/backend/utils/sort/logtape.c              |   2 +-
 src/backend/utils/sort/tuplestore.c           |   2 +-
 src/include/storage/buffile.h                 |  13 +-
 9 files changed, 200 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c
index 4c2301da00..9b3b00142a 100644
--- a/src/backend/access/gist/gistbuildbuffers.c
+++ b/src/backend/access/gist/gistbuildbuffers.c
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
 	 * Create a temporary file to hold buffer pages that are swapped out of
 	 * memory.
 	 */
-	gfbb->pfile = BufFileCreateTemp(false);
+	gfbb->pfile = BufFileCreateTemp(false, false);
 	gfbb->nFileBlocks = 0;
 
 	/* Initialize free page management. */
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
index a2e2f86332..f8a3e1f0f4 100644
--- a/src/backend/backup/backup_manifest.c
+++ b/src/backend/backup/backup_manifest.c
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
 		manifest->buffile = NULL;
 	else
 	{
-		manifest->buffile = BufFileCreateTemp(false);
+		manifest->buffile = BufFileCreateTemp(false, false);
 		manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
 		if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
 			elog(ERROR, "failed to initialize checksum of backup manifest: %s",
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 2f7170604d..1b5c6448ef 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 	{
 		MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		file = BufFileCreateTemp(false);
+		file = BufFileCreateTemp(false, true);
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a27f51f622..6cb6dcc783 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -54,6 +54,16 @@
 #include "storage/fd.h"
 #include "utils/resowner.h"
 
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#define NO_LZ4_SUPPORT() \
+	ereport(ERROR, \
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+			 errmsg("compression method lz4 not supported"), \
+			 errdetail("This functionality requires the server to be built with lz4 support.")))
+
 /*
  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
  * The reason is that we'd like large BufFiles to be spread across multiple
@@ -62,6 +72,8 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+int temp_file_compression = TEMP_NONE_COMPRESSION;
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -95,7 +107,7 @@ struct BufFile
 	off_t		curOffset;		/* offset part of current pos */
 	int			pos;			/* next read/write position in buffer */
 	int			nbytes;			/* total # of valid bytes in buffer */
-
+	bool			compress; /* State of usege file compression */
 	/*
 	 * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
 	 * wasting per-file alignment padding when some users create many files.
@@ -127,6 +139,7 @@ makeBufFileCommon(int nfiles)
 	file->curOffset = 0;
 	file->pos = 0;
 	file->nbytes = 0;
+	file->compress = false;
 
 	return file;
 }
@@ -190,7 +203,7 @@ extendBufFile(BufFile *file)
  * transaction boundaries.
  */
 BufFile *
-BufFileCreateTemp(bool interXact)
+BufFileCreateTemp(bool interXact, bool compress)
 {
 	BufFile    *file;
 	File		pfile;
@@ -212,6 +225,15 @@ BufFileCreateTemp(bool interXact)
 	file = makeBufFile(pfile);
 	file->isInterXact = interXact;
 
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+#ifdef USE_LZ4
+		file->compress = compress;
+#else
+		NO_LZ4_SUPPORT();
+#endif
+	}
+
 	return file;
 }
 
@@ -275,6 +297,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
 	file->files[0] = MakeNewFileSetSegment(file, 0);
 	file->readOnly = false;
 
+
 	return file;
 }
 
@@ -455,13 +478,72 @@ BufFileLoadBuffer(BufFile *file)
 		INSTR_TIME_SET_ZERO(io_start);
 
 	/*
-	 * Read whatever we can get, up to a full bufferload.
+	 * Load data as it is stored in the temporary file
 	 */
-	file->nbytes = FileRead(thisfile,
+	if (!file->compress)
+	{
+
+		/*
+	 	* Read whatever we can get, up to a full bufferload.
+	 	*/
+		file->nbytes = FileRead(thisfile,
 							file->buffer.data,
 							sizeof(file->buffer),
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
+	/*
+	 * Read and decompress data from the temporary file
+	 * The first reading loads size of the compressed block
+	 * Second reading loads compressed data
+	 */
+	} else {
+		int nread;
+		int nbytes;
+
+		nread = FileRead(thisfile,
+							&nbytes,
+							sizeof(nbytes),
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+		/* if not EOF let's continue */
+		if (nread > 0)
+		{
+			/*
+			 * A long life buffer would make sence to limit number of
+			 * memory allocations
+			 */
+			char * buff;
+
+			/*
+			 * Read compressed data, curOffset differs with pos
+			 * It reads less data than it returns to caller
+			 * So the curOffset must be advanced here based on compressed size
+			 */
+			file->curOffset+=sizeof(nbytes);
+
+			buff = palloc(nbytes);
+
+			nread = FileRead(thisfile,
+							buff,
+							nbytes,
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+
+#ifdef USE_LZ4
+			file->nbytes = LZ4_decompress_safe(buff,
+				file->buffer.data,nbytes,sizeof(file->buffer));
+			file->curOffset += nread;
+#endif
+
+			if (file->nbytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed lz4 data is corrupt")));
+			pfree(buff);
+		}
+
+	}
+
 	if (file->nbytes < 0)
 	{
 		file->nbytes = 0;
@@ -494,9 +576,56 @@ static void
 BufFileDumpBuffer(BufFile *file)
 {
 	int			wpos = 0;
-	int			bytestowrite;
+	int			bytestowrite = 0;
 	File		thisfile;
 
+
+	/* Save nbytes value because the size changes due to compression */
+	int nbytesOriginal = file->nbytes;
+
+	bool compression = false;
+
+	char * DataToWrite;
+	DataToWrite = file->buffer.data;
+
+	/*
+	 * Prepare compressed data to write
+	 * size of compressed block needs to be added at the beggining of the
+	 * compressed data
+	 */
+
+
+	if (file->compress) {
+		int cBufferSize = 0;
+		char * cData;
+		int cSize = 0;
+#ifdef USE_LZ4
+		cBufferSize = LZ4_compressBound(file->nbytes);
+#endif
+		/*
+		 * A long life buffer would make sence to limit number of
+		 * memory allocations
+		 */
+		compression = true;
+		cData = palloc(cBufferSize + sizeof(int));
+#ifdef USE_LZ4
+		/*
+		 * Using stream compression would lead to the slight improvement in
+		 * compression ratio
+		 */
+		cSize = LZ4_compress_default(file->buffer.data,
+				cData + sizeof(int),file->nbytes, cBufferSize);
+#endif
+
+		/* Write size of compressed block in front of compressed data
+		 * It's used to determine amount of data to read within
+		 * decompression process
+		 */
+		memcpy(cData,&cSize,sizeof(int));
+		file->nbytes=cSize + sizeof(int);
+		DataToWrite = cData;
+	}
+
 	/*
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
 	 * crosses a component-file boundary; so we need a loop.
@@ -535,7 +664,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 DataToWrite + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +693,19 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
+
+
+	if (!file->compress)
+		file->curOffset -= (file->nbytes - file->pos);
+	else
+		if (nbytesOriginal - file->pos != 0)
+			/* curOffset must be corrected also if compression is
+			 * enabled, nbytes was changed by compression but we
+			 * have to use the original value of nbytes
+			 */
+			file->curOffset-=bytestowrite;
+
+
 	if (file->curOffset < 0)	/* handle possible segment crossing */
 	{
 		file->curFile--;
@@ -577,6 +718,9 @@ BufFileDumpBuffer(BufFile *file)
 	 */
 	file->pos = 0;
 	file->nbytes = 0;
+
+	if (compression)
+		pfree(DataToWrite);
 }
 
 /*
@@ -602,8 +746,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 	{
 		if (file->pos >= file->nbytes)
 		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
+			/* Try to load more data into buffer.
+			 *
+			 * curOffset is moved within BufFileLoadBuffer
+			 * because stored data size differs from loaded/
+			 * decompressed size
+			 * */
+			if (!file->compress)
+				file->curOffset += file->pos;
 			file->pos = 0;
 			file->nbytes = 0;
 			BufFileLoadBuffer(file);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..3821caf763 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/buffile.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
 #include "storage/predicate.h"
@@ -454,6 +455,17 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 #endif
 	{NULL, 0, false}
 };
+/*
+ * pglz and zstd support should be added as future enhancement
+ *
+ */
+static const struct config_enum_entry temp_file_compression_options[] = {
+	{"no", TEMP_NONE_COMPRESSION, false},
+#ifdef  USE_LZ4
+	{"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+	{NULL, 0, false}
+};
 
 static const struct config_enum_entry wal_compression_options[] = {
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
@@ -4856,6 +4868,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the default compression method for compressible values."),
+			NULL
+		},
+		&temp_file_compression,
+		TEMP_NONE_COMPRESSION,
+		temp_file_compression_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Sets the transaction isolation level of each new transaction."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..e9c0b36352 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -177,6 +177,7 @@
 
 #max_notify_queue_pages = 1048576	# limits the number of SLRU pages allocated
 					# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no'	# enables temporary files compression
 
 # - Kernel Resources -
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 44b30e86ad..af43b3ebb1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
 	}
 	else
-		lts->pfile = BufFileCreateTemp(false);
+		lts->pfile = BufFileCreateTemp(false, false);
 
 	return lts;
 }
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index a720d70200..a952f0f4f5 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
 			 */
 			oldcxt = MemoryContextSwitchTo(state->context->parent);
 
-			state->myfile = BufFileCreateTemp(state->interXact);
+			state->myfile = BufFileCreateTemp(state->interXact, false);
 
 			MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 5f6d7c8e3f..486b552e31 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,22 @@
 
 typedef struct BufFile BufFile;
 
+typedef enum
+{
+	TEMP_NONE_COMPRESSION,
+#ifdef USE_LZ4
+	TEMP_LZ4_COMPRESSION
+#endif
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
+
 /*
  * prototypes for functions in buffile.c
  */
 
-extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateTemp(bool interXact, bool compress);
 extern void BufFileClose(BufFile *file);
 extern pg_nodiscard size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
-- 
2.46.2



view thread (3+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected]
  Subject: Re: Proposal: Adding compression of temporary files
  In-Reply-To: <CAFjYY+JJ3x-QUBpSYr5eTdapERhS9Nw3SEAH+QnBB=kypoXUJw@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox