public inbox for [email protected]  
help / color / mirror / Atom feed
From: Filip Janus <[email protected]>
To: Tomas Vondra <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Adding compression of temporary files
Date: Thu, 28 Nov 2024 12:32:15 +0100
Message-ID: <CAFjYY+J7ZcJAoE+K4AoyuiS_N6w8vVCQ6HQ72mPB=i4C+QbAeQ@mail.gmail.com> (raw)
In-Reply-To: <CAFjYY++22tmGDSac_YPA_NLJ4-cEJo7kS3SxXFHf7ZFTnwO-QA@mail.gmail.com>
References: <CAFjYY+LAxqRpaJpukBx9KjeyhZvMu6O4xcCUU9gw1SFGsUTcDA@mail.gmail.com>
	<CAFjYY+JJ3x-QUBpSYr5eTdapERhS9Nw3SEAH+QnBB=kypoXUJw@mail.gmail.com>
	<[email protected]>
	<CAFjYY++22tmGDSac_YPA_NLJ4-cEJo7kS3SxXFHf7ZFTnwO-QA@mail.gmail.com>

I've added a regression test for lz4 compression if the server is compiled
with the "--with-lz4" option.

    -Filip-


ne 24. 11. 2024 v 15:53 odesílatel Filip Janus <[email protected]> napsal:

>
>
>     -Filip-
>
>
> st 20. 11. 2024 v 1:35 odesílatel Tomas Vondra <[email protected]> napsal:
>
>> Hi,
>>
>> On 11/18/24 22:58, Filip Janus wrote:
>> > ...
>> >     Hi all,
>> >     Postgresql supports data compression nowadays, but the compression
>> of
>> >     temporary files has not been implemented yet. The huge queries can
>> >     produce a significant amount of temporary data that needs to
>> >     be stored on disk
>> >     and cause many expensive I/O operations.
>> >     I am attaching a proposal of the patch to enable temporary files
>> >     compression for
>> >     hashjoins for now. Initially, I've chosen the LZ4 compression
>> >     algorithm. It would
>> >     probably make better sense to start with pglz, but I realized it
>> late.
>> >
>>
>> Thanks for the idea & patch. I agree this might be quite useful for
>> workloads generating a lot of temporary files for stuff like sorts etc.
>> I think it will be interesting to think about the trade offs, i.e. how
>> to pick the compression level - at some point the compression ratio
>> stops improving while paying more and more CPU time. Not sure what the
>> right choice is, so using default seems fine.
>>
>> I agree it'd be better to start with pglz, and only then add lz4 etc.
>> Firstly, pglz is simply the built-in compression, supported everywhere.
>> And it's also simpler to implement, I think.
>>
>> >     # Future possible improvements
>> >     Reducing the number of memory allocations within the dumping and
>> >     loading of
>> >     the buffer. I have two ideas for solving this problem. I would
>> >     either add a buffer into
>> >     struct BufFile or provide the buffer as an argument from the caller.
>> >     For the sequential
>> >     execution, I would prefer the second option.
>> >
>>
>> Yes, this would be good. Doing a palloc+pfree for each compression is
>> going to be expensive, especially because these buffers are going to be
>> large - likely larger than 8kB. Which means it's not cached in the
>> memory context, etc.
>>
>> Adding it to the BufFile is not going to fly, because that doubles the
>> amount of memory per file. And we already have major issues with hash
>> joins consuming massive amounts of memory. But at the same time the
>> buffer is only needed during compression, and there's only one at a
>> time. So I agree with passing a single buffer as an argument.
>>
>> >     # Future plan/open questions
>> >     In the future, I would like to add support for pglz and zstd.
>> >     Further, I plan to
>> >     extend the support of the temporary file compression also for
>> >     sorting, gist index creation, etc.
>> >
>> >     Experimenting with the stream mode of compression algorithms. The
>> >     compression
>> >     ratio of LZ4 in block mode seems to be satisfying, but the stream
>> >     mode could
>> >     produce a better ratio, but it would consume more memory due to the
>> >     requirement to store
>> >     context for LZ4 stream compression.
>> >
>>
>> One thing I realized is that this only enables temp file compression for
>> a single place - hash join spill files. AFAIK this is because compressed
>> files don't support random access, and the other places might need that.
>>
>> Is that correct? The patch does not explain this anywhere. If that's
>> correct, the patch probably should mention this in a comment for the
>> 'compress' argument added to BufFileCreateTemp(), so that it's clear
>> when it's legal to set compress=true.
>>
>
> I will add the description there.
>
>
>> Which other places might compress temp files? Surely hash joins are not
>> the only place that could benefit from this, right?
>>
>
> Yes, you are definitely right. I have chosen the hash joins as a POC
> because
> there are no seeks besides seeks at the beginning of the buffer.
> I have focused on hashjoins, but there are definitely also other places
> where
> the compression could be used. I want to add support in other places
> in the feature.
>
>
>> Another thing is testing. If I run regression tests, it won't use
>> compression at all, because the GUC has "none" by default, right? But we
>> need some testing, so how would we do that? One option would be to add a
>> regression test that explicitly sets the GUC and does a hash join, but
>> that won't work with lz4 (because that may not be enabled).
>
>
> Right, it's "none" by default. My opinion is that we would like to test
> every supported compression method, so I will try to add environment
> variable as
> you recommended.
>
>
>>
>> Another option might be to add a PG_TEST_xxx environment variable that
>> determines compression to use. Something like PG_TEST_USE_UNIX_SOCKETS.
>> But perhaps there's a simpler way.
>>
>> >     # Benchmark
>> >     I prepared three different databases to check expectations. Each
>> >     dataset is described below. My testing demonstrates that my patch
>> >     improves the execution time of huge hash joins.
>> >     Also, my implementation should not
>> >     negatively affect performance within smaller queries.
>> >     The usage of memory needed for temporary files was reduced in every
>> >      execution without a significant impact on execution time.
>> >
>> >     *## Dataset A:*
>> >     Tables*
>> >     *
>> >     table_a(bigint id,text data_text,integer data_number) - 10000000
>> rows
>> >     table_b(bigint id, integer ref_id, numeric data_value, bytea
>> >     data_blob) - 10000000 rows
>> >     Query:  SELECT *  FROM table_a a JOIN table_b b ON a.id <http://
>> >     a.id> = b.id <http://b.id;;
>> >
>> >     The tables contain highly compressible data.
>> >     The query demonstrated a reduction in the usage of the temporary
>> >     files ~20GB -> 3GB, based on this reduction also caused the
>> execution
>> >     time of the query to be reduced by about ~10s.
>> >
>> >
>> >     *## Dataset B:*
>> >     Tables:*
>> >     *
>> >     table_a(integer id, text data_blob) - 1110000 rows
>> >     table_b(integer id, text data_blob) - 10000000 rows
>> >     Query:  SELECT *  FROM table_a a JOIN table_b b ON a.id <http://
>> >     a.id> = b.id <http://b.id;;
>> >
>> >     The tables contain less compressible data. data_blob was generated
>> >     by a pseudo-random generator.
>> >     In this case, the data reduction was only ~50%. Also, the execution
>> >     time was reduced
>> >     only slightly with the enabled compression.
>> >
>> >     The second scenario demonstrates no overhead in the case of enabled
>> >     compression and extended work_mem to avoid temp file usage.
>> >
>> >     *## Dataset C:*
>> >     Tables
>> >     customers (integer,text,text,text,text)
>> >     order_items(integer,integer,integer,integer,numeric(10,2))
>> >     orders(integer,integer,timestamp,numeric(10,2))
>> >     products(integer,text,text,numeric(10,2),integer)
>> >
>> >     Query: SELECT p.product_id, p.name <http://p.name;, p.price,
>> >     SUM(oi.quantity) AS total_quantity, AVG(oi.price) AS avg_item_price
>> >     FROM eshop.products p JOIN eshop.order_items oi ON p.product_id =
>> >     oi.product_id JOIN
>> >     eshop.orders o ON oi.order_id = o.order_id WHERE o.order_date >
>> >     '2020-01-01' AND p.price > 50
>> >     GROUP BY p.product_id, p.name <http://p.name;, p.price HAVING
>> >     SUM(oi.quantity) > 1000
>> >     ORDER BY total_quantity DESC LIMIT 100;
>> >
>> >     This scenario should demonstrate a more realistic usage of the
>> database.
>> >     Enabled compression slightly reduced the temporary memory usage, but
>> >     the execution
>> >     time wasn't affected by compression.
>> >
>> >
>> >     +------------+-------------------------+-----------------------
>> >     +------------------------------+
>> >     |  Dataset   | Compression.       | temp_bytes         | Execution
>> >     Time (ms)   |
>> >     +------------+-------------------------+-----------------------
>> >     +----------------------------- +
>> >     | A             | Yes                        |  3.09 GiB
>> >     | 22s586ms           | work_mem  = 4MB
>> >     |                | No                         |  21.89 GiB
>> >     | 35s                       | work_mem  = 4MB
>> >     +------------+-------------------------+-----------------------
>> >     +----------------------------------------
>> >     | B             | Yes                        |  333 MB
>> >     | 1815.545 ms       | work_mem = 4MB
>> >     |                 | No                        |  146  MB
>> >       | 1500.460 ms        | work_mem = 4MB
>> >     |                 | Yes                       |  0 MB
>> >         | 3262.305 ms        | work_mem = 80MB
>> >     |                 | No                        |  0 MB
>> >        | 3174.725 ms         | work_mem = 80MB
>> >     +-------------+------------------------+------------------------
>> >     +-------------------------------------
>> >     | C             | Yes                       | 40 MB
>> >     | 1011.020 ms        | work_mem = 1MB
>> >     |                | No                        |  53
>> >     MB                 |  1034.142 ms        | work_mem = 1MB
>> >     +------------+------------------------+------------------------
>> >     +--------------------------------------
>> >
>> >
>>
>> Thanks. I'll try to do some benchmarks on my own.
>>
>> Are these results fro ma single run, or an average of multiple runs?
>
>
> It is average from multiple runs.
>
> Do
>> you maybe have a script to reproduce this, including the data generation?
>
>
> I am attaching my SQL file for database preparation. I also did further
> testing
> with two other machines( see attachment huge_tables.rtf ).
>
>>
>> Also, can you share some information about the machine used for this? I
>> expect the impact to strongly depends on memory pressure - if the temp
>> file fits into page cache (and stays there), it may not benefit from the
>> compression, right?
>>
>
> If it fits into the page cache due to compression, I would consider it as
> a benefit from compression.
> I performed further testing on machines with different memory sizes.
> Both experiments showed that compression was beneficial for execution
> time.
> The execution time reduction was more significant in the case of the
> machine that had
> less memory available.
>
> Tests were performed on:
> MacBook PRO M3 36GB - MacOs
> Virtual machine ARM64 10GB/ 6CPU - Fedora 39
>
>
>>
>> regards
>>
>> --
>> Tomas Vondra
>>
>>


Attachments:

  [application/octet-stream] 0001-This-commit-adds-support-for-temporary-files-compres-v3.patch (77.0K, 3-0001-This-commit-adds-support-for-temporary-files-compres-v3.patch)
  download | inline diff:
From fd4e42c830bdf8231ac7b6ae21326c38baacdc34 Mon Sep 17 00:00:00 2001
From: Filip <[email protected]>
Date: Thu, 24 Oct 2024 12:15:10 +0200
Subject: [PATCH] This commit adds support for temporary files compression, it
 can be used only for hashjoins now.

It also adds GUC parameter temp_file_compression that enables this functionality.
For now, it supports just lz4 algorithms. In the future, it
could also be implemented pglz and zstd support.
---
 src/Makefile.global.in                        |    1 +
 src/backend/access/gist/gistbuildbuffers.c    |    2 +-
 src/backend/backup/backup_manifest.c          |    2 +-
 src/backend/executor/nodeHashjoin.c           |    2 +-
 src/backend/storage/file/buffile.c            |  176 ++-
 src/backend/utils/misc/guc_tables.c           |   23 +
 src/backend/utils/misc/postgresql.conf.sample |    1 +
 src/backend/utils/sort/logtape.c              |    2 +-
 src/backend/utils/sort/tuplestore.c           |    2 +-
 src/include/storage/buffile.h                 |   13 +-
 src/test/regress/GNUmakefile                  |    4 +
 src/test/regress/expected/join_hash_lz4.out   | 1166 +++++++++++++++++
 src/test/regress/sql/join_hash_lz4.sql        |  626 +++++++++
 14 files changed, 2006 insertions(+), 16 deletions(-)
 create mode 100644 src/test/regress/expected/join_hash_lz4.out
 create mode 100644 src/test/regress/sql/join_hash_lz4.sql

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 42f50b4976..06e701fe9c 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -193,6 +193,7 @@ with_ldap	= @with_ldap@
 with_libxml	= @with_libxml@
 with_libxslt	= @with_libxslt@
 with_llvm	= @with_llvm@
+with_lz4	= @with_lz4@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c
index 4c2301da00..9b3b00142a 100644
--- a/src/backend/access/gist/gistbuildbuffers.c
+++ b/src/backend/access/gist/gistbuildbuffers.c
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
 	 * Create a temporary file to hold buffer pages that are swapped out of
 	 * memory.
 	 */
-	gfbb->pfile = BufFileCreateTemp(false);
+	gfbb->pfile = BufFileCreateTemp(false, false);
 	gfbb->nFileBlocks = 0;
 
 	/* Initialize free page management. */
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
index a2e2f86332..f8a3e1f0f4 100644
--- a/src/backend/backup/backup_manifest.c
+++ b/src/backend/backup/backup_manifest.c
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
 		manifest->buffile = NULL;
 	else
 	{
-		manifest->buffile = BufFileCreateTemp(false);
+		manifest->buffile = BufFileCreateTemp(false, false);
 		manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
 		if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
 			elog(ERROR, "failed to initialize checksum of backup manifest: %s",
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 2f7170604d..1b5c6448ef 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 	{
 		MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		file = BufFileCreateTemp(false);
+		file = BufFileCreateTemp(false, true);
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a27f51f622..818ef39d5c 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -54,6 +54,16 @@
 #include "storage/fd.h"
 #include "utils/resowner.h"
 
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#define NO_LZ4_SUPPORT() \
+	ereport(ERROR, \
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+			 errmsg("compression method lz4 not supported"), \
+			 errdetail("This functionality requires the server to be built with lz4 support.")))
+
 /*
  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
  * The reason is that we'd like large BufFiles to be spread across multiple
@@ -62,6 +72,8 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+int temp_file_compression = TEMP_NONE_COMPRESSION;
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -95,7 +107,7 @@ struct BufFile
 	off_t		curOffset;		/* offset part of current pos */
 	int			pos;			/* next read/write position in buffer */
 	int			nbytes;			/* total # of valid bytes in buffer */
-
+	bool			compress; /* State of usege file compression */
 	/*
 	 * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
 	 * wasting per-file alignment padding when some users create many files.
@@ -127,6 +139,7 @@ makeBufFileCommon(int nfiles)
 	file->curOffset = 0;
 	file->pos = 0;
 	file->nbytes = 0;
+	file->compress = false;
 
 	return file;
 }
@@ -188,9 +201,17 @@ extendBufFile(BufFile *file)
  * Note: if interXact is true, the caller had better be calling us in a
  * memory context, and with a resource owner, that will survive across
  * transaction boundaries.
+ *
+ * If compress is true the temporary files will be compressed before
+ * writing on disk.
+ *
+ * Note: The compression does not support random access. Only the hash joins
+ * use it for now. The seek operation other than seek to the beginning of the
+ * buffile will corrupt temporary data offsets.
+ *
  */
 BufFile *
-BufFileCreateTemp(bool interXact)
+BufFileCreateTemp(bool interXact, bool compress)
 {
 	BufFile    *file;
 	File		pfile;
@@ -212,6 +233,15 @@ BufFileCreateTemp(bool interXact)
 	file = makeBufFile(pfile);
 	file->isInterXact = interXact;
 
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+#ifdef USE_LZ4
+		file->compress = compress;
+#else
+		NO_LZ4_SUPPORT();
+#endif
+	}
+
 	return file;
 }
 
@@ -275,6 +305,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
 	file->files[0] = MakeNewFileSetSegment(file, 0);
 	file->readOnly = false;
 
+
 	return file;
 }
 
@@ -455,13 +486,72 @@ BufFileLoadBuffer(BufFile *file)
 		INSTR_TIME_SET_ZERO(io_start);
 
 	/*
-	 * Read whatever we can get, up to a full bufferload.
+	 * Load data as it is stored in the temporary file
 	 */
-	file->nbytes = FileRead(thisfile,
+	if (!file->compress)
+	{
+
+		/*
+	 	* Read whatever we can get, up to a full bufferload.
+	 	*/
+		file->nbytes = FileRead(thisfile,
 							file->buffer.data,
 							sizeof(file->buffer),
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
+	/*
+	 * Read and decompress data from the temporary file
+	 * The first reading loads size of the compressed block
+	 * Second reading loads compressed data
+	 */
+	} else {
+		int nread;
+		int nbytes;
+
+		nread = FileRead(thisfile,
+							&nbytes,
+							sizeof(nbytes),
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+		/* if not EOF let's continue */
+		if (nread > 0)
+		{
+			/*
+			 * A long life buffer would make sence to limit number of
+			 * memory allocations
+			 */
+			char * buff;
+
+			/*
+			 * Read compressed data, curOffset differs with pos
+			 * It reads less data than it returns to caller
+			 * So the curOffset must be advanced here based on compressed size
+			 */
+			file->curOffset+=sizeof(nbytes);
+
+			buff = palloc(nbytes);
+
+			nread = FileRead(thisfile,
+							buff,
+							nbytes,
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+
+#ifdef USE_LZ4
+			file->nbytes = LZ4_decompress_safe(buff,
+				file->buffer.data,nbytes,sizeof(file->buffer));
+			file->curOffset += nread;
+#endif
+
+			if (file->nbytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed lz4 data is corrupt")));
+			pfree(buff);
+		}
+
+	}
+
 	if (file->nbytes < 0)
 	{
 		file->nbytes = 0;
@@ -494,9 +584,56 @@ static void
 BufFileDumpBuffer(BufFile *file)
 {
 	int			wpos = 0;
-	int			bytestowrite;
+	int			bytestowrite = 0;
 	File		thisfile;
 
+
+	/* Save nbytes value because the size changes due to compression */
+	int nbytesOriginal = file->nbytes;
+
+	bool compression = false;
+
+	char * DataToWrite;
+	DataToWrite = file->buffer.data;
+
+	/*
+	 * Prepare compressed data to write
+	 * size of compressed block needs to be added at the beggining of the
+	 * compressed data
+	 */
+
+
+	if (file->compress) {
+		int cBufferSize = 0;
+		char * cData;
+		int cSize = 0;
+#ifdef USE_LZ4
+		cBufferSize = LZ4_compressBound(file->nbytes);
+#endif
+		/*
+		 * A long life buffer would make sence to limit number of
+		 * memory allocations
+		 */
+		compression = true;
+		cData = palloc(cBufferSize + sizeof(int));
+#ifdef USE_LZ4
+		/*
+		 * Using stream compression would lead to the slight improvement in
+		 * compression ratio
+		 */
+		cSize = LZ4_compress_default(file->buffer.data,
+				cData + sizeof(int),file->nbytes, cBufferSize);
+#endif
+
+		/* Write size of compressed block in front of compressed data
+		 * It's used to determine amount of data to read within
+		 * decompression process
+		 */
+		memcpy(cData,&cSize,sizeof(int));
+		file->nbytes=cSize + sizeof(int);
+		DataToWrite = cData;
+	}
+
 	/*
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
 	 * crosses a component-file boundary; so we need a loop.
@@ -535,7 +672,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 DataToWrite + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +701,19 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
+
+
+	if (!file->compress)
+		file->curOffset -= (file->nbytes - file->pos);
+	else
+		if (nbytesOriginal - file->pos != 0)
+			/* curOffset must be corrected also if compression is
+			 * enabled, nbytes was changed by compression but we
+			 * have to use the original value of nbytes
+			 */
+			file->curOffset-=bytestowrite;
+
+
 	if (file->curOffset < 0)	/* handle possible segment crossing */
 	{
 		file->curFile--;
@@ -577,6 +726,9 @@ BufFileDumpBuffer(BufFile *file)
 	 */
 	file->pos = 0;
 	file->nbytes = 0;
+
+	if (compression)
+		pfree(DataToWrite);
 }
 
 /*
@@ -602,8 +754,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 	{
 		if (file->pos >= file->nbytes)
 		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
+			/* Try to load more data into buffer.
+			 *
+			 * curOffset is moved within BufFileLoadBuffer
+			 * because stored data size differs from loaded/
+			 * decompressed size
+			 * */
+			if (!file->compress)
+				file->curOffset += file->pos;
 			file->pos = 0;
 			file->nbytes = 0;
 			BufFileLoadBuffer(file);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..3821caf763 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -73,6 +73,7 @@
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/buffile.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
 #include "storage/predicate.h"
@@ -454,6 +455,17 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 #endif
 	{NULL, 0, false}
 };
+/*
+ * pglz and zstd support should be added as future enhancement
+ *
+ */
+static const struct config_enum_entry temp_file_compression_options[] = {
+	{"no", TEMP_NONE_COMPRESSION, false},
+#ifdef  USE_LZ4
+	{"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+	{NULL, 0, false}
+};
 
 static const struct config_enum_entry wal_compression_options[] = {
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
@@ -4856,6 +4868,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the default compression method for compressible values."),
+			NULL
+		},
+		&temp_file_compression,
+		TEMP_NONE_COMPRESSION,
+		temp_file_compression_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Sets the transaction isolation level of each new transaction."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 667e0dc40a..e9c0b36352 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -177,6 +177,7 @@
 
 #max_notify_queue_pages = 1048576	# limits the number of SLRU pages allocated
 					# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no'	# enables temporary files compression
 
 # - Kernel Resources -
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 44b30e86ad..af43b3ebb1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
 	}
 	else
-		lts->pfile = BufFileCreateTemp(false);
+		lts->pfile = BufFileCreateTemp(false, false);
 
 	return lts;
 }
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index a720d70200..a952f0f4f5 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
 			 */
 			oldcxt = MemoryContextSwitchTo(state->context->parent);
 
-			state->myfile = BufFileCreateTemp(state->interXact);
+			state->myfile = BufFileCreateTemp(state->interXact, false);
 
 			MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 5f6d7c8e3f..486b552e31 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,22 @@
 
 typedef struct BufFile BufFile;
 
+typedef enum
+{
+	TEMP_NONE_COMPRESSION,
+#ifdef USE_LZ4
+	TEMP_LZ4_COMPRESSION
+#endif
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
+
 /*
  * prototypes for functions in buffile.c
  */
 
-extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateTemp(bool interXact, bool compress);
 extern void BufFileClose(BufFile *file);
 extern pg_nodiscard size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
diff --git a/src/test/regress/GNUmakefile b/src/test/regress/GNUmakefile
index 9003435aab..859eb79bd7 100644
--- a/src/test/regress/GNUmakefile
+++ b/src/test/regress/GNUmakefile
@@ -114,6 +114,10 @@ submake-contrib-spi: | submake-libpgport submake-generated-headers
 REGRESS_OPTS = --dlpath=. --max-concurrent-tests=20 \
 	$(EXTRA_REGRESS_OPTS)
 
+ifeq ($(with_lz4),yes)
+override EXTRA_TESTS := join_hash_lz4 $(EXTRA_TESTS)
+endif
+
 check: all
 	$(pg_regress_check) $(REGRESS_OPTS) --schedule=$(srcdir)/parallel_schedule $(MAXCONNOPT) $(EXTRA_TESTS)
 
diff --git a/src/test/regress/expected/join_hash_lz4.out b/src/test/regress/expected/join_hash_lz4.out
new file mode 100644
index 0000000000..966a5cd8f5
--- /dev/null
+++ b/src/test/regress/expected/join_hash_lz4.out
@@ -0,0 +1,1166 @@
+--
+-- exercises for the hash join code
+--
+begin;
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | f
+(1 row)
+
+rollback to settings;
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                      QUERY PLAN                       
+-------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select count(*) from simple r join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ t                    | f
+(1 row)
+
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+                      QUERY PLAN                      
+------------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on bigger_than_it_looks s
+(6 rows)
+
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                            QUERY PLAN                            
+------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Hash
+                           ->  Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+                                QUERY PLAN                                 
+---------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Partial Aggregate
+               ->  Parallel Hash Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on bigger_than_it_looks s
+(9 rows)
+
+select count(*) from simple r join bigger_than_it_looks s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+ initially_multibatch | increased_batches 
+----------------------+-------------------
+ f                    | t
+(1 row)
+
+rollback to settings;
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                    QUERY PLAN                    
+--------------------------------------------------
+ Aggregate
+   ->  Hash Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on extremely_skewed s
+(6 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                       QUERY PLAN                       
+--------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Hash
+                     ->  Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     2
+(1 row)
+
+rollback to settings;
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+                           QUERY PLAN                            
+-----------------------------------------------------------------
+ Aggregate
+   ->  Gather
+         Workers Planned: 1
+         ->  Parallel Hash Join
+               Hash Cond: (r.id = s.id)
+               ->  Parallel Seq Scan on simple r
+               ->  Parallel Hash
+                     ->  Parallel Seq Scan on extremely_skewed s
+(8 rows)
+
+select count(*) from simple r join extremely_skewed s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     4
+(1 row)
+
+rollback to settings;
+-- A couple of other hash join tests unrelated to work_mem management.
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+ original | final 
+----------+-------
+        1 |     1
+(1 row)
+
+rollback to settings;
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Hash
+                           ->  Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Aggregate
+   ->  Nested Loop Left Join
+         Join Filter: ((join_foo.id < (b1.id + 1)) AND (join_foo.id > (b1.id - 1)))
+         ->  Seq Scan on join_foo
+         ->  Gather
+               Workers Planned: 2
+               ->  Parallel Hash Join
+                     Hash Cond: (b1.id = b2.id)
+                     ->  Parallel Seq Scan on join_bar b1
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on join_bar b2
+(11 rows)
+
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+ count 
+-------
+     3
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+ multibatch 
+------------
+ f
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: (r.id = s.id)
+         ->  Seq Scan on simple r
+         ->  Hash
+               ->  Seq Scan on simple s
+(6 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: (r.id = s.id)
+                     ->  Parallel Seq Scan on simple r
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple s
+(9 rows)
+
+select  count(*) from simple r full outer join simple s using (id);
+ count 
+-------
+ 20000
+(1 row)
+
+rollback to settings;
+-- A full outer join where every record is not matched.
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+               QUERY PLAN               
+----------------------------------------
+ Aggregate
+   ->  Hash Full Join
+         Hash Cond: ((0 - s.id) = r.id)
+         ->  Seq Scan on simple s
+         ->  Hash
+               ->  Seq Scan on simple r
+(6 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Full Join
+                     Hash Cond: ((0 - s.id) = r.id)
+                     ->  Parallel Seq Scan on simple s
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on simple r
+(9 rows)
+
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+ count 
+-------
+ 40000
+(1 row)
+
+rollback to settings;
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 2
+         ->  Partial Aggregate
+               ->  Parallel Hash Left Join
+                     Hash Cond: (wide.id = wide_1.id)
+                     ->  Parallel Seq Scan on wide
+                     ->  Parallel Hash
+                           ->  Parallel Seq Scan on wide wide_1
+(9 rows)
+
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+ length 
+--------
+ 320000
+(1 row)
+
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+ multibatch 
+------------
+ t
+(1 row)
+
+rollback to settings;
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ id | id 
+----+----
+  1 |   
+    |  2
+(2 rows)
+
+ROLLBACK TO settings;
+rollback;
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: ((hjtest_1.id = (SubPlan 1)) AND ((SubPlan 2) = (SubPlan 3)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_1
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         Filter: ((SubPlan 4) < 50)
+         SubPlan 4
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   ->  Hash
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         ->  Seq Scan on public.hjtest_2
+               Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+               Filter: ((SubPlan 5) < 55)
+               SubPlan 5
+                 ->  Result
+                       Output: (hjtest_2.c * 5)
+         SubPlan 1
+           ->  Result
+                 Output: 1
+                 One-Time Filter: (hjtest_2.id = 1)
+         SubPlan 3
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   SubPlan 2
+     ->  Result
+           Output: (hjtest_1.b * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+                                           QUERY PLAN                                           
+------------------------------------------------------------------------------------------------
+ Hash Join
+   Output: hjtest_1.a, hjtest_2.a, (hjtest_1.tableoid)::regclass, (hjtest_2.tableoid)::regclass
+   Hash Cond: (((SubPlan 1) = hjtest_1.id) AND ((SubPlan 3) = (SubPlan 2)))
+   Join Filter: (hjtest_1.a <> hjtest_2.b)
+   ->  Seq Scan on public.hjtest_2
+         Output: hjtest_2.a, hjtest_2.tableoid, hjtest_2.id, hjtest_2.c, hjtest_2.b
+         Filter: ((SubPlan 5) < 55)
+         SubPlan 5
+           ->  Result
+                 Output: (hjtest_2.c * 5)
+   ->  Hash
+         Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+         ->  Seq Scan on public.hjtest_1
+               Output: hjtest_1.a, hjtest_1.tableoid, hjtest_1.id, hjtest_1.b
+               Filter: ((SubPlan 4) < 50)
+               SubPlan 4
+                 ->  Result
+                       Output: (hjtest_1.b * 5)
+         SubPlan 2
+           ->  Result
+                 Output: (hjtest_1.b * 5)
+   SubPlan 1
+     ->  Result
+           Output: 1
+           One-Time Filter: (hjtest_2.id = 1)
+   SubPlan 3
+     ->  Result
+           Output: (hjtest_2.c * 5)
+(28 rows)
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+  a1  | a2 |    t1    |    t2    
+------+----+----------+----------
+ text | t  | hjtest_1 | hjtest_2
+(1 row)
+
+ROLLBACK;
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Nested Loop
+   ->  Seq Scan on int8_tbl i8
+   ->  Sort
+         Sort Key: t1.fivethous, i4.f1
+         ->  Hash Join
+               Hash Cond: (t1.fivethous = (i4.f1 + i8.q2))
+               ->  Seq Scan on tenk1 t1
+               ->  Hash
+                     ->  Seq Scan on int4_tbl i4
+(9 rows)
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+ q2  | fivethous | f1 
+-----+-----------+----
+ 456 |       456 |  0
+ 456 |       456 |  0
+ 123 |       123 |  0
+ 123 |       123 |  0
+(4 rows)
+
+rollback;
diff --git a/src/test/regress/sql/join_hash_lz4.sql b/src/test/regress/sql/join_hash_lz4.sql
new file mode 100644
index 0000000000..1d19c1980e
--- /dev/null
+++ b/src/test/regress/sql/join_hash_lz4.sql
@@ -0,0 +1,626 @@
+--
+-- exercises for the hash join code
+--
+
+begin;
+
+set local min_parallel_table_scan_size = 0;
+set local parallel_setup_cost = 0;
+set local enable_hashjoin = on;
+set local temp_file_compression = 'lz4';
+
+-- Extract bucket and batch counts from an explain analyze plan.  In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function find_hash(node json)
+returns json language plpgsql
+as
+$$
+declare
+  x json;
+  child json;
+begin
+  if node->>'Node Type' = 'Hash' then
+    return node;
+  else
+    for child in select json_array_elements(node->'Plans')
+    loop
+      x := find_hash(child);
+      if x is not null then
+        return x;
+      end if;
+    end loop;
+    return null;
+  end if;
+end;
+$$;
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+  whole_plan json;
+  hash_node json;
+begin
+  for whole_plan in
+    execute 'explain (analyze, format ''json'') ' || query
+  loop
+    hash_node := find_hash(json_extract_path(whole_plan, '0', 'Plan'));
+    original := hash_node->>'Original Hash Batches';
+    final := hash_node->>'Hash Batches';
+    return next;
+  end loop;
+end;
+$$;
+
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+  select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+
+-- Make a relation whose size we will under-estimate.  We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+  select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+analyze bigger_than_it_looks;
+update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks';
+
+-- Make a relation whose size we underestimate and that also has a
+-- kind of skew that breaks our batching scheme.  We want stats to say
+-- 2 rows, but actually there are 20,000 rows with the same key.
+create table extremely_skewed (id int, t text);
+alter table extremely_skewed set (autovacuum_enabled = 'false');
+alter table extremely_skewed set (parallel_workers = 2);
+analyze extremely_skewed;
+insert into extremely_skewed
+  select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
+  from generate_series(1, 20000);
+update pg_class
+  set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
+  where relname = 'extremely_skewed';
+
+-- Make a relation with a couple of enormous tuples.
+create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
+alter table wide set (parallel_workers = 2);
+
+-- The "optimal" case: the hash table fits in memory; we plan for 1
+-- batch, we stick to that number, and peak memory usage stays within
+-- our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- The "good" case: batches required, but we plan the right number; we
+-- plan for some number of batches, and we stick to that number, and
+-- peak memory usage says within our work_mem budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join simple s using (id);
+select count(*) from simple r join simple s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+-- parallel full multi-batch hash join
+select count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- The "bad" case: during execution we need to increase number of
+-- batches; in this case we plan for 1 batch, and increase at least a
+-- couple of times, and peak memory usage stays within our work_mem
+-- budget
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '192kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+select count(*) from simple r join bigger_than_it_looks s using (id);
+select original > 1 as initially_multibatch, final > original as increased_batches
+  from hash_join_batches(
+$$
+  select count(*) from simple r join bigger_than_it_looks s using (id);
+$$);
+rollback to settings;
+
+-- The "ugly" case: increasing the number of batches during execution
+-- doesn't help, so stop trying to fit in work_mem and hope for the
+-- best; in this case we plan for 1 batch, increases just once and
+-- then stop increasing because that didn't help at all, so we blow
+-- right through the work_mem budget and hope for the best...
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-oblivious hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = off;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- parallel with parallel-aware hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 1;
+set local work_mem = '128kB';
+set local hash_mem_multiplier = 1.0;
+set local enable_parallel_hash = on;
+explain (costs off)
+  select count(*) from simple r join extremely_skewed s using (id);
+select count(*) from simple r join extremely_skewed s using (id);
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join extremely_skewed s using (id);
+$$);
+rollback to settings;
+
+-- A couple of other hash join tests unrelated to work_mem management.
+
+-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+set local work_mem = '4MB';
+set local hash_mem_multiplier = 1.0;
+set local parallel_leader_participation = off;
+select * from hash_join_batches(
+$$
+  select count(*) from simple r join simple s using (id);
+$$);
+rollback to settings;
+
+-- Exercise rescans.  We'll turn off parallel_leader_participation so
+-- that we can check that instrumentation comes back correctly.
+
+create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
+alter table join_foo set (parallel_workers = 0);
+create table join_bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
+alter table join_bar set (parallel_workers = 2);
+
+-- multi-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-oblivious
+savepoint settings;
+set enable_parallel_hash = off;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- multi-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '64kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- single-batch with rescan, parallel-aware
+savepoint settings;
+set enable_parallel_hash = on;
+set parallel_leader_participation = off;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+set parallel_tuple_cost = 0;
+set max_parallel_workers_per_gather = 2;
+set enable_material = off;
+set enable_mergejoin = off;
+set work_mem = '4MB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select count(*) from join_foo
+  left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+  on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select count(*) from join_foo
+    left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss
+    on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1;
+$$);
+rollback to settings;
+
+-- A full outer join where every record is matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s using (id);
+select  count(*) from simple r full outer join simple s using (id);
+rollback to settings;
+
+-- A full outer join where every record is not matched.
+
+-- non-parallel
+savepoint settings;
+set local max_parallel_workers_per_gather = 0;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism not possible with parallel-oblivious full hash join
+savepoint settings;
+set enable_parallel_hash = off;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+-- parallelism is possible with parallel-aware full hash join
+savepoint settings;
+set local max_parallel_workers_per_gather = 2;
+explain (costs off)
+     select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+select  count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
+rollback to settings;
+
+
+-- exercise special code paths for huge tuples (note use of non-strict
+-- expression and left join required to get the detoasted tuple into
+-- the hash table)
+
+-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
+-- sts_puttuple oversized tuple cases because it's multi-batch)
+savepoint settings;
+set max_parallel_workers_per_gather = 2;
+set enable_parallel_hash = on;
+set work_mem = '128kB';
+set hash_mem_multiplier = 1.0;
+explain (costs off)
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select length(max(s.t))
+from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+select final > 1 as multibatch
+  from hash_join_batches(
+$$
+  select length(max(s.t))
+  from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
+$$);
+rollback to settings;
+
+
+-- Hash join reuses the HOT status bit to indicate match status. This can only
+-- be guaranteed to produce correct results if all the hash join tuple match
+-- bits are reset before reuse. This is done upon loading them into the
+-- hashtable.
+SAVEPOINT settings;
+SET enable_parallel_hash = on;
+SET min_parallel_table_scan_size = 0;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+CREATE TABLE hjtest_matchbits_t1(id int);
+CREATE TABLE hjtest_matchbits_t2(id int);
+INSERT INTO hjtest_matchbits_t1 VALUES (1);
+INSERT INTO hjtest_matchbits_t2 VALUES (2);
+-- Update should create a HOT tuple. If this status bit isn't cleared, we won't
+-- correctly emit the NULL-extended unmatching tuple in full hash join.
+UPDATE hjtest_matchbits_t2 set id = 2;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id
+  ORDER BY t1.id;
+-- Test serial full hash join.
+-- Resetting parallel_setup_cost should force a serial plan.
+-- Just to be safe, however, set enable_parallel_hash to off, as parallel full
+-- hash joins are only supported with shared hashtables.
+RESET parallel_setup_cost;
+SET enable_parallel_hash = off;
+SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id;
+ROLLBACK TO settings;
+
+rollback;
+
+-- Verify that hash key expressions reference the correct
+-- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's
+-- need to reference Hash's outer plan (which is below HashJoin's
+-- inner plan). It's not trivial to verify that the references are
+-- correct (we don't display the hashkeys themselves), but if the
+-- hashkeys contain subplan references, those will be displayed. Force
+-- subplans to appear just about everywhere.
+--
+-- Bug report:
+-- https://www.postgresql.org/message-id/CAPpHfdvGVegF_TKKRiBrSmatJL2dR9uwFCuR%2BteQ_8tEXU8mxg%40mail.gmail.com
+--
+BEGIN;
+SET LOCAL enable_sort = OFF; -- avoid mergejoins
+SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order
+
+CREATE TABLE hjtest_1 (a text, b int, id int, c bool);
+CREATE TABLE hjtest_2 (a bool, id int, b text, c int);
+
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50
+INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 1, false); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 2); -- matches
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 3, 'another', 7); -- fails id join condition
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 90);  -- fails < 55
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'another', 3); -- fails (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+INSERT INTO hjtest_2(a, id, b, c) VALUES (true, 1, 'text', 1); --  fails hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_1, hjtest_2
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+EXPLAIN (COSTS OFF, VERBOSE)
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2
+FROM hjtest_2, hjtest_1
+WHERE
+    hjtest_1.id = (SELECT 1 WHERE hjtest_2.id = 1)
+    AND (SELECT hjtest_1.b * 5) = (SELECT hjtest_2.c*5)
+    AND (SELECT hjtest_1.b * 5) < 50
+    AND (SELECT hjtest_2.c * 5) < 55
+    AND hjtest_1.a <> hjtest_2.b;
+
+ROLLBACK;
+
+-- Verify that we behave sanely when the inner hash keys contain parameters
+-- (that is, outer or lateral references).  This situation has to defeat
+-- re-use of the inner hash table across rescans.
+begin;
+set local enable_hashjoin = on;
+
+explain (costs off)
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+select i8.q2, ss.* from
+int8_tbl i8,
+lateral (select t1.fivethous, i4.f1 from tenk1 t1 join int4_tbl i4
+         on t1.fivethous = i4.f1+i8.q2 order by 1,2) ss;
+
+rollback;
-- 
2.39.5 (Apple Git-154)



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected]
  Subject: Re: Proposal: Adding compression of temporary files
  In-Reply-To: <CAFjYY+J7ZcJAoE+K4AoyuiS_N6w8vVCQ6HQ72mPB=i4C+QbAeQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox